Skip to content

Instantly share code, notes, and snippets.

@jfinstrom
Created June 19, 2014 01:46
Show Gist options
  • Save jfinstrom/a30c75c8af300f41ad4c to your computer and use it in GitHub Desktop.
Save jfinstrom/a30c75c8af300f41ad4c to your computer and use it in GitHub Desktop.
Config generator for dahdi in python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# rhino_genconf
VERSION = "svn"
# Copyright 2012 James Finstrom <jfinstrom@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import os
import sys
import re
import datetime
from optparse import OptionParser
from optparse import Option, OptionValueError
#probably safe to change:
DEBUG = False
parameters = {
'trunkcontext' : "from-trunk",
'extencontext' : "from-internal",
'defaultgroup' : "0",
't1switch' : "national",
'e1switch' : "euroisdn"
}
#what to parse
procloc='/proc/dahdi/'
#Where to write kernel level configs
kconf='/etc/dahdi/system.conf'
#Where to write application level configs
aconf='/etc/asterisk/dahdi-channels.conf'
#Headers for config files
afileheader = """
;This file was autogenerated by the rhino_genconf utility.
;This utility only works with Rhino Hardware
;If you are using cards from another manufacturer you may wish to use
;dahdi_genconf. Any edits to this file will be lost if anyone re-runs
;rhino_genconf or dahdi_genconf
"""
kfileheader = """
#This file was autogenerated by the rhino_genconf utility.
#This utility only works with Rhino Hardware
#If you are using cards from another manufacturer you may wish to use
#dahdi_genconf. Any edits to this file will be lost if anyone re-runs
#rhino_genconf or dahdi_genconf
"""
#Descriptive Errors
#things you should probably not touch.... Just sayin
CBMODE = False
PROG = os.path.basename(os.path.splitext(__file__)[0])
CBSPAN = False
#class we have that....
class card:
def __init__(self):
self.type = ""
self.description = ""
self.spannum = ""
self.channels = []
self.echocan = False
def addChannel(self,c):
self.channels.append(c)
def get_ch_count(self):
return len(self.channels)
def get_last(self):
import operator
seq = [int(n['lchan']) for n in self.channels]
if DEBUG:
print "DEBUG: %r \n" % seq
h = max(seq)
return h
def get_first(self):
import operator
seq = [int(n['lchan']) for n in self.channels]
if DEBUG:
print "DEBUG: %r \n" % seq
l = min(seq)
return l
def clearit(self):
self.type = ""
self.description = ""
self.spannum = ""
self.channels[:] = []
self.echocan = False
class MultipleOption(Option):
ACTIONS = Option.ACTIONS + ("extend",)
STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
def take_action(self, action, dest, opt, value, values, parser):
if action == "extend":
values.ensure_value(dest, []).append(value)
else:
Option.take_action(self, action, dest, opt, value, values, parser)
try:
any
except NameError:
def any(s):
for v in s:
if v:
return True
return False
def getcards():
try:
dlist = os.listdir(procloc)
except OSError:
print "YOU LOOSE: "
return dlist
def get_proc_file(n):
procdata = open(procloc+n).readlines()
return procdata
def parse_channel_line(tech,line):
pcl = line.strip()
if tech == "analog":
pattern = re.compile('^(\d+)\s*(FX[OS]|---|)/(\d+)/(\d+)')
if tech == "t1":
pattern = re.compile('^(\d+)\s*(R[124]T1)/(\d+)/(\d+).*')
result = re.findall(pattern, pcl)
return result
def parse_span_line(line):
psl = line.strip()
data = psl.split(":")
span = data[0].strip()
desc = data[1].strip()
return span,desc
def linetype(line):
if len(line) < 5:
return "empty"
if "Span" in line:
return "header"
if any(x in line for x in ['FXO', 'FXS', '---']):
return "achannel"
if any(x in line for x in ['R1T1', 'R2T1', 'R4T1']):
return "dchannel"
def main():
description = """Tell us about your channel banks. """
parser = OptionParser(option_class=MultipleOption,
usage='usage: %prog [-cb <span> <FXO Modules> <FXS Modules>]',
version='%s %s' % (PROG, VERSION), description = description)
parser.add_option('-c', '--channelbank',
action="extend", type="string",
dest='cbs',
metavar='CHANNELBANKS',
nargs = 3)
if len(sys.argv) > 1:
global CBMODE
CBMODE = True
OPTIONS, args = parser.parse_args()
import copy
c = getcards()
cardobj=[]
klines = kfileheader
alines = afileheader
for i in c:
x = card()
x.spannum = i
pd = get_proc_file(i)
for line in pd:
lt = linetype(line)
if lt == "empty":
continue
if lt == "header":
s,d = parse_span_line(line)
if DEBUG:
print d
x.description = d
if lt == "achannel":
x.type = "analog"
data = parse_channel_line("analog",line)
x.addChannel({'lchan' : data[0][0], 'chtype': data[0][1]})
x.echocan = True
if lt == "dchannel":
x.type = "t1"
data = parse_channel_line("t1", line)
x.addChannel({'lchan' : data[0][0], 'chtype': "t1"})
if x.type == "t1":
klines += "# %s \n" % x.description
alines += "; %s \n" % x.description
if x.get_ch_count() == 24:
mode = "t"
if CBMODE:
for i in OPTIONS.cbs:
global CBSPAN
if i[0] == x.spannum:
CBSPAN = True
FXO = int(i[1])
FXS = int(i[2])
klines += "#This spans config was altered because someone passed -c %s %s %s \n" % (i[0], FXO, FXS)
alines += ";This spans config was altered because someone passed -c %s %s %s \n" % (i[0], FXO, FXS)
else:
CBSPAN = False
if CBSPAN:
if (FXO > 0) and (FXS > 0):
ochannels = x.get_first()
ochannele = ochannels + ( FXO * 4)
schannels = ochannele + 1
schannele = schannels + ( FXS * 4)
klines += "span = %s,0,0,esf,b8zs \n" % str(x.spannum)
klines += "fxsks = %s-%s \n" % (ochannels, ochannele)
klines += "fxoks = %s-%s \n" % (schannels, schannele)
#kickin it with the FXO
alines += "context = %s \n" % parameters['trunkcontext']
alines += "signalling = fxs_ks \n"
alines += "group = %s \n" % parameters['defaultgroup']
alines += "channel => %s-%s \n" % (ochannels, ochannele)
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
#rolling on the FXS
alines += "context = %s \n" % parameters['extencontext']
alines += "signalling = fxo_ks \n"
alines += "group = 55 \n"
alines += "channel => %s-%s \n" % (schannels, schannele)
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if (FXO > 0) and (FXS == 0):
ochannels = x.get_first()
ochannele = ochannels + ( FXO * 4)
klines += "span = %s,0,0,esf,b8zs \n" % str(x.spannum)
klines += "fxsks = %s-%s \n" % (ochannels, ochannele)
alines += "context = %s \n" % parameters['trunkcontext']
alines += "signalling = fxs_ks \n"
alines += "group = %s \n" % parameters['defaultgroup']
alines += "channel => %s-%s \n" % (ochannels, ochannele)
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if (FXO == 0) and (FXS > 0):
schannels = x.get_first()
schannele = schannels + ( FXO * 4)
klines += "span = %s,0,0,esf,b8zs \n" % str(x.spannum)
klines += "fxoks = %s-%s \n" % (schannels, schannele)
alines += "context = %s \n" % parameters['extencontext']
alines += "signalling = fxo_ks \n"
alines += "group = 55 \n"
alines += "channel => %s-%s \n" % (schannels, schannele)
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
else:
klines += "span = %s,%s,0,esf,b8zs \n" % (str(x.spannum) , str(x.spannum))
klines += "bchan = %s-%s \n" % (x.get_first(), (x.get_last() - 1))
klines += "dchan = %s \n\n" % x.get_last()
alines += "context = %s \n" % parameters['trunkcontext']
alines += "signalling = pri_cpe \n"
alines += "group = %s \n" % parameters['defaultgroup']
alines += "switchtype = %s \n" % parameters['t1switch']
alines += "channel => %s-%s \n" % (x.get_first(), (x.get_last() - 1))
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if x.get_ch_count() == 31:
mode = "e"
klines += "Span = %s,%s,0,ccs,hdb3 \n" % (str(x.spannum), str(x.spannum))
klines += "bchan = %s-%s,%s-$s \n" % (x.get_first(), ( x.get_first() + 15),(x.get_last() - 15), x.get_last())
klines += "dchan = %s \n\n" % (x.get_first() + 16)
alines += "context = %s \n" % parameters['trunkcontext']
alines += "signalling = pri_cpe \n"
alines += "group = %s \n" % parameters['defaultgroup']
alines += "switchtype = %s \n" % parameters['e1switch']
alines += "channel => %s-%s \n" % (str(x.get_first()), str(( x.get_first() + 15)))
alines += "channel => %s-%s \n" % (str((x.get_last() - 15)), str(x.get_last()))
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if x.type == "analog":
for c in x.channels:
if c['chtype'] == "FXO":
klines += "fxsks = %s \n" % c['lchan']
alines += "context = %s \n" % parameters['trunkcontext']
alines += "signalling = fxs_ks \n"
alines += "group = %s \n" % parameters['defaultgroup']
alines += "channel => %s \n" % c['lchan']
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if c['chtype'] == "FXS":
klines += "fxoks = %s \n" % c['lchan']
alines += "context = %s \n" % parameters['extencontext']
alines += "signalling = fxo_ks \n"
alines += "group = 66 \n"
alines += "channel => %s \n" % c['lchan']
alines += ";the next 2 lines are provided to prevent accidental inheritence \n"
alines += "group = 50\ncontext = default \n\n"
if c['chtype'] == "---":
klines += "#No Module %s \n" % c['lchan']
alines += ";No module for channel %s \n" % c['lchan']
#x.clearit()
klines += "loadzone = us\n"
klines += "defaultzone = us\n"
now = datetime.datetime.now()
try:
os.rename(os.path.realpath(aconf), os.path.realpath(aconf)+".bak")
os.rename(os.path.realpath(kconf), os.path.realpath(kconf)+".bak")
except:
print "Couldn't make a backup... maybe the file was not there"
try:
f = open(os.path.realpath(aconf), "w")
f.write(alines)
f.close()
f = open(os.path.realpath(kconf), "w")
f.write(klines)
f.close()
except:
print "Couldnt write to the config files"
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment