Skip to content

Instantly share code, notes, and snippets.

@Sophrinix
Forked from zzzeek/overlap_cidr.py
Created March 10, 2017 01:03
Embed
What would you like to do?
detect overlapping CIDR ranges in SQL, prevent duplicate ranges using triggers
from sqlalchemy import event
from sqlalchemy import DDL
def mysql_cidr_overlap(engine, metadata):
@event.listens_for(metadata, "after_create")
def _create_mysql_proc(target, connection, **kw):
if connection.engine.name != 'mysql':
return
if connection.scalar(
"SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
"WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
"ROUTINE_NAME=%s",
("cidr_overlap", )
):
connection.execute("DROP FUNCTION cidr_overlap")
connection.execute(
DDL("""
CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
RETURNS TINYINT
BEGIN
DECLARE bitmask INT;
-- note - Mike is semi-guessing on the math here, needs tests! don't stick
-- into production pls :)
SET bitmask = pow(
2,
(32 - least(
cast(substring_index(cidr1, '/', -1) as integer),
cast(substring_index(cidr2, '/', -1) as integer)
))
) - 1;
RETURN
inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask =
inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask;
END
""")
)
if __name__ == '__main__':
from sqlalchemy import Column, Integer, String, create_engine, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, aliased
from sqlalchemy import event
Base = declarative_base()
class A(Base):
__tablename__ = 'a'
id = Column(Integer, primary_key=True)
subnet = Column(String(30))
event.listen(
A.__table__, "after_create",
DDL("""
CREATE TRIGGER no_overlap_cidr_a
BEFORE INSERT ON a
FOR EACH ROW
BEGIN
DECLARE msg VARCHAR(200);
IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(subnet, NEW.subnet))) THEN
SET msg = CONCAT(
'inserted subnet ', NEW.subnet,
' conflicts with existing subnets');
SIGNAL sqlstate '45000'
SET MESSAGE_TEXT = msg;
END IF;
END
""")
)
e = create_engine("mysql://scott:tiger@localhost/test", echo=True)
mysql_cidr_overlap(e, Base.metadata)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)
s = Session(e)
with s.begin_nested():
s.add(A(subnet='192.168.1.0/24'))
with s.begin_nested():
s.add(A(subnet='192.168.2.0/24'))
try:
with s.begin_nested():
s.add(A(subnet='192.168.2.0/25'))
except Exception as e:
print "Error! %s" % e
s.commit()
a1, a2 = aliased(A), aliased(A)
# return all non-overlapping CIDR pairs
for a, b in s.query(a1.subnet, a2.subnet).\
filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
filter(a1.id > a2.id):
print a, b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment