Skip to content

Instantly share code, notes, and snippets.

@ragingcomputer
Created February 3, 2020 23:18
Show Gist options
  • Save ragingcomputer/ddade9fae62afbc92c4ae75e2697e823 to your computer and use it in GitHub Desktop.
Save ragingcomputer/ddade9fae62afbc92c4ae75e2697e823 to your computer and use it in GitHub Desktop.
Python script to run as cron job on Icinga2 host and create camera list by querying milestone xprotect corporate microsoft sql database. tested only with xprotect 2019
from pymssql import connect
import hashlib
import ipaddress
import subprocess
output_file = "/etc/icinga2/conf.d/ipcams/cameras.conf"
conn = connect(
host="milestone_sql.fq.dn",
user="read_user",
password="read_password",
database="Surveillance",
)
sql_query = """
SELECT
DISTINCT Dev.IDHardware,
Substring((Select ', ' + DevB.Name From Devices DevB Where DevB.IDHardware=Dev.IDHardware AND DevB.Enabled = 1 For XML Path('')),2,8000) AS DevList,
(Select COUNT(DevB.IDHardware) From Devices DevB Where DevB.IDHardware=Dev.IDHardware AND DevB.Enabled = 1 ) AS DevCount,
Hardware.Name,
Hardware.URI,
Hardware.LoginID,
Hardware.DetectedModel,
Hardware.VendorProductID
FROM Devices Dev
LEFT JOIN Hardware ON Hardware.IDHardware = Dev.IDHardware
WHERE Dev.Enabled = 1
ORDER BY Hardware.Name ASC
"""
def valid_ip(dev_hardware_uri):
try:
ipaddress.ip_address(dev_hardware_uri)
return True
except ValueError:
return False
def sortable_ip(dev_hardware_uri):
try:
dev_addr = ipaddress.ip_address(dev_hardware_uri)
oct1, oct2, oct3, oct4 = dev_addr.exploded.split(".")
dev_addr = "{}.{}.{}.{}".format(
oct1.zfill(3), oct2.zfill(3), oct3.zfill(3), oct4.zfill(3)
)
return dev_addr
except ValueError:
pass
try:
uri = dev_hardware_uri.split(":")[0]
return uri
except ValueError:
return dev_hardware_uri
cur = conn.cursor(as_dict=True)
cur.execute(sql_query)
rows = cur.fetchall()
record_list = []
for row in rows:
hardware_name = row["Name"]
hardware_uri = row["URI"]
hardware_login = row["LoginID"]
hardware_model = row["DetectedModel"]
dev_list = str(row["DevList"]).strip()
dev_count = row["DevCount"]
vendor_product_id = row["VendorProductID"]
hardware_uri = (
str(hardware_uri)
.lower()
.replace("http://", "")
.replace("https://", "")
.replace("/", "")
.strip()
)
sort_addr = sortable_ip(hardware_uri)
if not valid_ip(hardware_uri):
hardware_uri = "{}.fq.dn".format(sort_addr)
record = [
'object Host "IP Cam - {} ({})" {}'.format(hardware_uri, dev_list, "{"),
' import "ip-cam"',
' address = "{}"'.format(hardware_uri),
"}",
"",
]
record_str = "\n".join(record)
record_list.append(record_str)
cur.close()
conn.close()
hash_on_disk = hashlib.md5()
with open(output_file, "r") as file_on_disk:
buf = file_on_disk.read()
hash_on_disk.update(buf.encode("utf-8"))
records_file = "\n".join(record_list)
hash_in_ram = hashlib.md5()
hash_in_ram.update(records_file.encode("utf-8"))
if hash_on_disk.hexdigest() != hash_in_ram.hexdigest():
with open(output_file, "w") as outF:
outF.write(records_file)
result = subprocess.run(['/usr/sbin/icinga2', 'daemon', '--validate'])
returncode = result.returncode
if returncode == 0:
result = subprocess.run(['systemctl', 'reload', 'icinga2'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment