Created
May 12, 2021 17:38
-
-
Save casdr/258618cf6412dbf2637b416be396c540 to your computer and use it in GitHub Desktop.
SIA-HA to IRC script (erg lelijk)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import re | |
import socket | |
from datetime import datetime | |
import pyshark | |
import requests | |
from requests.auth import HTTPDigestAuth | |
import copy | |
import time | |
IRC_IP = '' | |
IRC_PORT = 47774 | |
IRC_PASS = '' | |
COLORS = { | |
'reset': '\003', | |
'red': '\0034', | |
'green': '\0039', | |
'blue': '\00310', | |
'darkblue': '\00312', | |
'yellow': '\0038', | |
'orange': '\0037', | |
'gray': '\00394', | |
} | |
COLOR_MAP = { | |
'0': COLORS['green'], | |
'1': COLORS['yellow'], | |
'2': COLORS['red'], | |
} | |
cams = { | |
'Poort In': '', | |
'Afdak L': '', | |
'Afdak R': '', | |
'Entree': '' | |
} | |
intercoms = { | |
'Voordeur': '', | |
} | |
cam_auth = HTTPDigestAuth('user', 'pass') | |
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
OLD_DATA = {} | |
def send_msg(msg, channels="#xs", prefix="Toegang"): | |
for channel in channels.split(","): | |
udp.sendto('%s %s %s[%s]%s %s' % (IRC_PASS, channel, COLORS['darkblue'], prefix, COLORS['reset'], msg), | |
(IRC_IP, IRC_PORT)) | |
def grab_img(cam='', channels="#xs"): | |
try: | |
img = requests.get('http://%s/cgi-bin/snapshot.cgi' % cams[cam], auth=cam_auth) | |
files = {'content': img.content, 'ttl': 2592000} | |
paste = requests.post('https://pastebin.local', data=files, allow_redirects=True) | |
send_msg("Grabbed image for %s: %s?.png" % (cam, paste.url), channels=channels, prefix="Webcam") | |
except Exception as e: | |
print(e) | |
return "Failed fetching image" | |
def grab_intercom(cam='', channels="#xs"): | |
try: | |
img = requests.get('http://%s/record/current.jpg' % intercoms[cam], auth=('user', 'pass')) | |
files = {'content': img.content, 'ttl': 2592000} | |
paste = requests.post('https://pastebin.local', data=files, allow_redirects=True) | |
send_msg("Grabbed image for %s: %s?.png" % (cam, paste.url), channels=channels, prefix="Webcam") | |
except Exception as e: | |
print(e) | |
return "Failed fetching image" | |
prev = None | |
capture = pyshark.LiveCapture(interface="ens192", bpf_filter="udp and dst port 4004") | |
for packet in capture.sniff_continuously(): | |
try: | |
hex = packet.data.data | |
data = hex.decode('hex_codec') | |
bytes = bytearray(data) | |
except: | |
continue | |
if hex == prev: | |
continue | |
prev = hex | |
xor = int("0x" + hex[32:34], 16) | |
for i in range(len(bytes)): | |
bytes[i] ^= xor | |
line = str(bytes) | |
with open("/var/log/alarm/debug.log", "a") as f: | |
f.write("[" + datetime.now().strftime("%d/%m/%Y %H:%M:%S") + "] ") | |
f.write(line) | |
f.write("\n") | |
data = { | |
"user": "Unknown", | |
"user_id": None, | |
"id": None, | |
"sia": None, | |
"comment": None | |
} | |
sp = line.split("^") | |
if len(sp) > 5: | |
sp = sp.pop(0) | |
try: | |
if sp[1].strip() != "": | |
data['user'] = sp[1].strip() | |
except: | |
pass | |
try: | |
spf = sp[0].split("id") | |
data['user_id'] = spf[1] | |
except: | |
pass | |
try: | |
data['id'] = sp[2].strip()[3:] | |
except: | |
pass | |
try: | |
data['sia'] = sp[2].strip()[1:3] | |
except: | |
pass | |
try: | |
data['comment'] = sp[3].strip() | |
except: | |
pass | |
print(line) | |
print(data) | |
if not data['sia']: | |
send_msg(line, channels="#testing") | |
continue | |
if OLD_DATA == data: | |
continue | |
OLD_DATA = copy.deepcopy(data) | |
try: | |
if data['sia'] in ['BA', 'FA']: | |
send_msg("%sALARM%s: %s" % ( | |
COLORS['red'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
grab_img("Entree", channels="#xs,#brbd") | |
elif data['sia'] in ['BC', 'FC', 'BR', 'FR']: | |
send_msg("%sALARM CLEARED%s: %s" % ( | |
COLORS['green'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
grab_img("Entree", channels="#xs,#brbd") | |
elif data['sia'] in ['DG']: | |
send_msg("%s (%s) %sOK%s on %s (%s)" % ( | |
data['user'], data['user_id'], COLORS['green'], COLORS['reset'], data['comment'], data['id'])) | |
if data['id'] == '6202': | |
grab_img("Poort In") | |
elif data['id'] == '5202': | |
grab_intercom("Voordeur") | |
grab_img("Afdak L") | |
elif data['sia'] in ['DI', 'DD']: | |
send_msg("%s (%s) %sDENIED%s on %s (%s)" % ( | |
data['user'], data['user_id'], COLORS['red'], COLORS['reset'], data['comment'], data['id'])) | |
if data['id'] == '6202': | |
grab_img("Poort In") | |
elif data['id'] == '5202': | |
grab_intercom("Voordeur") | |
grab_img("Afdak L") | |
elif data['sia'] in ['CL']: | |
send_msg("%sGESLOTEN%s door %s" % ( | |
COLORS['red'], COLORS['reset'], data['comment']), prefix="Breedpand", channels="#xs,#brbd") | |
time.sleep(1.5) | |
grab_img("Entree", channels="#xs,#brbd") | |
elif data['sia'] in ['OP']: | |
send_msg("%sGEOPEND%s door %s" % ( | |
COLORS['green'], COLORS['reset'], data['comment']), prefix="Breedpand", channels="#xs,#brbd") | |
time.sleep(1.5) | |
grab_img("Entree", channels="#xs,#brbd") | |
elif data['sia'] in ['UF']: | |
send_msg("%sDOOR FORCED%s: %s" % (COLORS['red'], COLORS['reset'], data['comment']), prefix="Toegang", channels="#xs,#brbd") | |
elif data['sia'] in ['UA', 'UT']: | |
send_msg("%sSTORING%s: %s" % (COLORS['red'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
elif data['sia'] in ['UR', 'UJ']: | |
send_msg("%sSTORING OPGEHEVEN%s: %s" % (COLORS['green'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
elif data['sia'] in ['FA', 'FT']: | |
send_msg("%sBRANDMELDING%s: %s" % (COLORS['red'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
grab_img("Entree", channels="#xs,#brbd") | |
elif data['sia'] in ['FR', 'FJ']: | |
send_msg("%sBRANDMELDING OPGEHEVEN%s: %s" % (COLORS['green'], COLORS['reset'], data['comment']), prefix="Alarm", channels="#xs,#brbd") | |
grab_img("Entree", channels="#xs,#brbd") | |
else: | |
send_msg("%sUNKNOWN EVENT%s %s with comment %s" % ( | |
COLORS['yellow'], COLORS['reset'], data['sia'], data['comment']), channels="#testing") | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment