Skip to content

Instantly share code, notes, and snippets.

@Sophrinix
Forked from zzzeek/overlap_cidr.py
Created March 10, 2017 01:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sophrinix/31ad4c77c821bdf76e338b3dc44f56d8 to your computer and use it in GitHub Desktop.
Save Sophrinix/31ad4c77c821bdf76e338b3dc44f56d8 to your computer and use it in GitHub Desktop.
detect overlapping CIDR ranges in SQL, prevent duplicate ranges using triggers
from sqlalchemy import event
from sqlalchemy import DDL
def mysql_cidr_overlap(engine, metadata):
@event.listens_for(metadata, "after_create")
def _create_mysql_proc(target, connection, **kw):
if connection.engine.name != 'mysql':
return
if connection.scalar(
"SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
"WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
"ROUTINE_NAME=%s",
("cidr_overlap", )
):
connection.execute("DROP FUNCTION cidr_overlap")
connection.execute(
DDL("""
CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
RETURNS TINYINT
BEGIN
DECLARE bitmask INT;
-- note - Mike is semi-guessing on the math here, needs tests! don't stick
-- into production pls :)
SET bitmask = pow(
2,
(32 - least(
cast(substring_index(cidr1, '/', -1) as integer),
cast(substring_index(cidr2, '/', -1) as integer)
))
) - 1;
RETURN
inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask =
inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask;
END
""")
)
if __name__ == '__main__':
from sqlalchemy import Column, Integer, String, create_engine, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, aliased
from sqlalchemy import event
Base = declarative_base()
class A(Base):
__tablename__ = 'a'
id = Column(Integer, primary_key=True)
subnet = Column(String(30))
event.listen(
A.__table__, "after_create",
DDL("""
CREATE TRIGGER no_overlap_cidr_a
BEFORE INSERT ON a
FOR EACH ROW
BEGIN
DECLARE msg VARCHAR(200);
IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(subnet, NEW.subnet))) THEN
SET msg = CONCAT(
'inserted subnet ', NEW.subnet,
' conflicts with existing subnets');
SIGNAL sqlstate '45000'
SET MESSAGE_TEXT = msg;
END IF;
END
""")
)
e = create_engine("mysql://scott:tiger@localhost/test", echo=True)
mysql_cidr_overlap(e, Base.metadata)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)
s = Session(e)
with s.begin_nested():
s.add(A(subnet='192.168.1.0/24'))
with s.begin_nested():
s.add(A(subnet='192.168.2.0/24'))
try:
with s.begin_nested():
s.add(A(subnet='192.168.2.0/25'))
except Exception as e:
print "Error! %s" % e
s.commit()
a1, a2 = aliased(A), aliased(A)
# return all non-overlapping CIDR pairs
for a, b in s.query(a1.subnet, a2.subnet).\
filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
filter(a1.id > a2.id):
print a, b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment