Skip to content

Instantly share code, notes, and snippets.

@DoMINAToR98
Created November 13, 2021 23:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DoMINAToR98/58197cb7913ed344f4b53a349df24154 to your computer and use it in GitHub Desktop.
Save DoMINAToR98/58197cb7913ed344f4b53a349df24154 to your computer and use it in GitHub Desktop.
Slack Automation
#!/usr/bin/env python3
import requests
import json
import argparse
import sys
conversationList = "https://slack.com/api/conversations.list"
postMessage = "https://slack.com/api/chat.postMessage"
fileUpload = "https://slack.com/api/files.upload"
apiKey = "key"
channelIDs = {"channel_name": "channel_id"}
headers = {'Authorization': "Bearer {}".format(apiKey), 'Content-Type': 'application/json'}
headersForFile = {'Authorization': "Bearer {}".format(apiKey), 'Content-Type': 'application/x-www-form-urlencoded'}
def firstNotification(target: str):
try:
timestamps = {}
for channel in channelIDs.values():
data = {"channel": channel, "text": "*{target}*".format(target=target)}
data = json.dumps(data)
con = requests.post(url=postMessage, headers=headers, data=data)
resp = json.loads(con.text)
# print(resp)
con.close()
timestamps.update({channel: resp['ts']})
with open('.timestamps', 'wb') as o:
o.write(json.dumps(timestamps).encode())
o.close()
print("Thread {target} started in all channels.".format(target=target))
except Exception as e:
print("Error in web request. Try again later.")
print(e)
def startThread(channel: str, target: str):
timestamps = {}
try:
if channel in channelIDs.keys():
data = {"channel": channelIDs.get(channel), "text": "*{target}*".format(target=target)}
data = json.dumps(data)
con = requests.post(url=postMessage, headers=headers, data=data)
resp = json.loads(con.text)
con.close()
timestamps.update({channel: resp['ts']})
with open('.timestamps', 'ab') as o:
o.write(json.dumps(timestamps).encode()+b"\n")
o.close()
print("Thread {target} started in channel {channel}.".format(target=target, channel=channel))
except Exception as e:
print("Error in web request. Try again later.")
print(e)
def getTimeStamp(channel: str):
try:
f = open('.timestamps', 'rb')
lines = f.readlines()
f.close()
if len(lines) != 1:
timestamps = []
for line in lines:
timestamps.append(json.loads(line.strip()))
for obj in timestamps:
if channel in obj.keys():
return obj[channel]
raise Exception("Invalid Channel ?")
else:
return json.loads(lines[0].strip())[channel]
except Exception as e:
print("File .timestamps not found. Was the first notification ever sent?\n"+str(e))
exit(69)
def notify(msg: str, channel: str):
if channel in channelIDs.keys():
try:
ts = getTimeStamp(channel)
data = {"channel": channelIDs[channel], "thread_ts": ts, "text": msg}
data = json.dumps(data)
con = requests.post(url=postMessage, headers=headers, data=data)
if con.status_code == 200:
print("Notification Sent")
except OSError:
print("FILE NOT FOUND.")
exit()
except Exception as e:
print("Error sending notification. Logging.")
logError(e, channel, msg)
else:
print("Invalid Channel. Aborting.")
def sendReport(channel: str, reportFile: str, fileName: str, nonThread: bool):
if channel in channelIDs.keys():
try:
if not nonThread:
ts = getTimeStamp(channel)
data = {"channels": channelIDs[channel], "content": open(reportFile, 'rb').read(), "thread_ts": ts,
"filename": fileName}
else:
data = {"channels": channelIDs[channel], "content": open(reportFile, 'rb').read(),
"filename": fileName}
conn = requests.post(fileUpload, headers=headersForFile, data=data)
if conn.status_code == 200:
print("Report sent successfully")
except Exception as e:
print("Error sending report file.")
logError(e, channel, reportFile)
def nonThreadNotify(channel: str, msg: str):
if channel in channelIDs.keys():
try:
data = {"channel": channel, "text": "*{message}*".format(message=msg)}
data = json.dumps(data)
con = requests.post(url=postMessage, headers=headers, data=data)
if con.status_code != 200:
raise Exception("Non 200 Status Code: "+str(con.status_code))
con.close()
except Exception as e:
print("An error has occurred.")
logError(e, channel, msg)
def logError(e: Exception, channel: str, msg: str):
with open('ts-{channel}.log'.format(channel=channel), 'w') as log:
log.write(msg+'\nError: '+str(e))
log.close()
parser = argparse.ArgumentParser(description="Send alerts and updates on Slack.")
parser.add_argument("--target", nargs=1, type=str, action='store', dest="target")
parser.add_argument("--channel", nargs=1, type=str, action='store', dest="channel", choices=channelIDs.keys())
parser.add_argument("--report", nargs=1, type=str, action='store', dest="fileName")
parser.add_argument("--message", nargs=1, type=str, action='store', dest="message")
parser.add_argument("--first", action="store_true")
parser.add_argument("--single-channel", action="store_true")
parser.add_argument("--non-thread-notify", action="store_true")
if not(len(sys.argv) > 1):
parser.print_help()
exit(1)
args = parser.parse_args()
if "--first" in sys.argv:
firstNotification(args.target[0])
exit(0)
elif "--report" in sys.argv:
if "--non-thread-notify" in sys.argv:
sendReport(channel=args.channel[0], fileName="sample", reportFile=args.fileName[0], nonThread=True)
else:
sendReport(channel=args.channel[0], fileName="sample", reportFile=args.fileName[0], nonThread=False)
exit(0)
elif "--single-channel" in sys.argv:
startThread(target=args.target[0], channel=args.channel[0])
exit(0)
elif "--non-thread-notify" in sys.argv:
nonThreadNotify(channel=args.channel[0], msg=args.message[0])
else:
notify(msg=args.message[0], channel=args.channel[0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment