Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Asterisk LUA dialplan
require("lsqlite3")
-- Igmar: Wanneer closen we dat DB object eigenlijk ?
db = sqlite3.open('/etc/asterisk/users.sqlite')
--CONSOLE = "Console/dsp" -- Console interface for demo
--CONSOLE = "DAHDI/1"
--CONSOLE = "Phone/phone0"
TRUNK = "DAHDI/G1"
TRUNKMSD = 1
APIKEY = "oursecretapikey"
-- The default comment is here
function e_drop_call(context, entension)
app.hangup(29)
end
function e_hotdesk_login(context, extension)
-- Find out the peername
name = channel.CALLERID("name"):get()
peername = channel.CHANNEL("peername"):get()
extension = string.sub(extension, 2)
status = get_extension_status(extension)
if status == false then
app.Playback("acces-denied")
app.Verbose(1, "Extension " .. extension .. " disabled")
app.Hangup(29)
end
-- Already logged in ?
location = extension_to_location(extension)
if location ~= nil then
app.Verbose(1, "Extension " .. extension .. " already logged in")
app.Playback("agent-alreadyon")
app.Hangup(29)
return
end
app.Playback("please-enter-the&access-code")
app.Verbose(1, "Logging in at the PBX context : " .. context .. " extension " .. extension .. " peername " .. peername)
userstatus = "FALSE"
userpin = get_extension_pin(extension)
-- Ask the PIN
app.Read("pin", "", 4)
pin = channel["pin"]:get()
if pin == userpin then
app.Verbose(1, "PIN OK")
-- Update the location field
res, error = db:exec(string.format("UPDATE ast_users SET location = '%s' WHERE extension = %s", peername, extension))
if res > 0 then
app.Verbose(1, string.format("UPDATE on ast_users failed : %d", res))
app.Hangup(29)
return
end
app.Playback("agent-loginok")
else
app.Playback("acces-denied")
app.Hangup(29)
return
end
-- Put the user in the queue
queue = get_extension_queue(extension)
if queue ~= nil then
app.Verbose(1, string.format("Adding member to queue : %s", queue))
app.AddQueueMember(queue)
end
end
function e_hotdesk_logout(context, extension)
name = channel.CALLERID("name"):get()
peername = channel.CHANNEL("peername"):get()
extension = location_to_extension(peername)
if extension == nil then
app.Verbose(1, "Location " .. peername .. " is not logged in")
app.Hangup(29)
return
end
app.Verbose(1, "Logging out at the PBX context : " .. context .. " extension " .. extension .. " peername " .. peername)
res, error = db:exec(string.format("UPDATE ast_users SET location = null WHERE extension = %s", extension))
if res > 0 then
app.Verbose(1, string.format("UPDATE on ast_users failed"))
app.Hangup(29)
return
end
app.Playback("agent-loggedoff")
queue = get_extension_queue(extension)
app.Verbose(1, string.format("Removing member from queue : %s", queue))
app.RemoveQueueMember(queue)
app.Answer()
end
function e_outgoing_sip(context, extension)
name = channel.CALLERID("name"):get()
peername = channel.CHANNEL("peername"):get()
app.Verbose("Called outgoing " .. extension .. " in context " .. context)
-- Already logged in ?
my_extension = location_to_extension(peername)
if my_extension == nil then
app.Verbose(1, "Location " .. peername .. " not logged in")
app.Playback("acces-denied")
app.Hangup(29)
end
app.Dial(TRUNK .. "/" .. extension)
end
function e_outgoing_emergency(context, extension)
name = channel.CALLERID("name"):get()
peername = channel.CHANNEL("peername"):get()
app.Verbose("Emergency dial " .. extension .. " in context " .. context)
app.Dial(TRUNK .. "/112")
end
function e_internal_sip(context, extension)
app.Verbose("Called internal extension " .. extension .. " in context " .. context)
-- Is this device logged on ?
name = channel.CALLERID("name"):get()
peername = channel.CHANNEL("peername"):get()
callee = channel.CALLERID("num"):get()
my_extension = nil
app.Verbose(1, string.format("Device name : %s, callee %s", name, callee))
callee_extension = location_to_extension(callee)
callee_name = location_to_name(callee)
-- peername is nil on a direct call transfer. Handle this
if peername ~= nil then
-- What is my own extension number ? Find it.
my_extension = location_to_extension(peername)
if my_extension == nil then
app.Verbose(1, "Location " .. peername .. " not logged in")
app.Playback("acces-denied")
app.Hangup(26)
end
if my_extension == extension then
app.Verbose(1, "We called outselves. Bad")
app.Playback("all-your-base")
app.Hangup(26)
end
end
dialed_location = extension_to_location(extension)
if dialed_location == nil then
app.Verbose(1, "Extension " .. extension .. " not active")
app.Playback("extension&T-is-not-available")
app.Hangup(20)
return
end
if callee_extension ~= nil then
app.Verbose(1, string.format("Callee extension : %s", callee_extension));
channel.CALLERID("all"):set(string.format("%s <%s>", callee_name, callee_extension))
end
app.Dial("SIP/" .. dialed_location)
end
function e_dial_custnr(context, extension)
peername = channel.CHANNEL("peername"):get()
app.Verbose(1, string.format("Peername : %s", peername))
my_extension = location_to_extension(peername)
if my_extension == nil then
app.Verbose(1, "Location " .. peername .. " not logged in")
app.Playback("acces-denied")
app.Hangup(29)
end
local https = require('ssl.https')
-- Ditch the initial *
extension = string.sub(extension, 2)
app.Verbose(1, "Entering dial_custnr_phone")
-- Build the URL
url = string.format("https://X.X.X.X/rest/?apikey=%s&cmd=custinfo&arg1=%s", APIKEY, extension)
app.Verbose(1, string.format("Requesting %s", url))
r, c, h, s = https.request(url)
for s1, s2 in string.gmatch(r, "([%w%s]+):([%w%s]+)") do
name = s1
number = s2
end
app.Verbose(1, string.format("Data for customer %s : %s %s", extension, name, number))
app.Dial(TRUNK .. "/" .. number)
end
function e_dial_employee(context, extension)
peername = channel.CHANNEL("peername"):get()
app.Verbose(1, string.format("Peername : %s", peername))
my_extension = location_to_extension(peername)
if my_extension == nil then
app.Verbose(1, "Location " .. peername .. " not logged in")
app.Playback("acces-denied")
app.Hangup(29)
end
local https = require('ssl.https')
extension = string.sub(extension, 3)
app.Verbose(1, string.format("Dialing employee %s", extension))
url = string.format("https://X.X.X.X/rest/?apikey=%s&cmd=emplinfo&arg1=%s", APIKEY, extension)
app.Verbose(1, string.format("Requesting %s", url))
r, c, h, s = https.request(url)
for s1, s2 in string.gmatch(r, "([%w%s]+):([%w%s]+)") do
name = s1
number = s2
end
app.Verbose(1, string.format("Data for employee %s : %s %s", extension, name, number))
app.Dial(TRUNK .. "/" .. number)
end
function e_incoming(context, extension)
-- Check if we are open
if in_office_hours() == false then
app.Playback("/var/lib/asterisk/sounds/custom/beantwoorder")
app.Hangup(16)
end
local https = require('ssl.https')
callee = channel.CALLERID("num"):get()
app.Verbose(1, string.format("Received call for %s from %s", extension, callee))
url = string.format("https://X.X.X.X/rest/?apikey=%s&cmd=numinfo&arg1=%s", APIKEY, callee)
app.Verbose(1, string.format("Requesting %s", url))
r, c, h, s = https.request(url)
name = nil
number = callee
for s1, s2 in string.gmatch(r, "([%w%s]+):([%w%s]+)") do
name = s1
end
if name == nil then
name = ""
end
if number == nil then
number = callee
end
channel.CALLERID("all"):set(string.format("%s <%s>", name, '0' .. number))
app.Queue("q1", "tTrn", "", "", 12)
app.Queue("q2", "tTr", "", "", 28)
app.Hangup(17)
end
-- Helper functions
function extension_to_location(extension)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE extension = '%s' LIMIT 1", extension)) do
if row['location'] == nil then
return nil
end
return string.format("%s", row['location'])
end
return nil
end
function location_to_extension(location)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE location = '%s' LIMIT 1", location)) do
if row['extension'] == nil then
return nil
end
return string.format("%s", row['extension'])
end
return nil
end
function location_to_name(location)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE location = '%s' LIMIT 1", location)) do
if row['first_name'] == nil then
return nil
end
return string.format("%s", row['first_name'])
end
return nil
end
function get_extension_status(extension)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE extension = '%s' LIMIT 1", extension)) do
if row['status'] == 1 then
return true
else
return false
end
end
end
function get_extension_pin(extension)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE extension = '%s' LIMIT 1", extension)) do
return string.format("%s", row['pin'])
end
end
function get_extension_queue(extension)
for row in db:nrows(string.format("SELECT * FROM ast_users WHERE extension = '%s' LIMIT 1", extension)) do
return string.format("%s", row['queue'])
end
end
function in_office_hours()
t = os.date("*t")
-- Werkdagen + zaterdag en zondag
if t['wday'] == 1 or t['wday'] == 7 or t['hour'] >= 17 or t['hour'] < 8 or (t['hour'] == 8 and t['min'] < 30) then
return false
end
-- Koninginnedag
if t['month'] == 4 and t['day'] == 30 then
return false
end
-- Bevrijdingsdag
if t['month'] == 5 and t['day'] == 5 then
return false
end
-- Kerstdagen
if t['month'] == 12 and t['day'] == 25 then
return false
end
if t['month'] == 12 and t['day'] == 26 then
return false
end
-- Oudejaarsdag
if t['month'] == 12 and t['day'] == 31 then
return false
end
return true
end
extensions = {
-- Incoming calls
["unauthenticated"] = {
["_."] = e_drop_call
};
["sipphones"] = {
["_#NXX"] = e_hotdesk_login,
["_#000"] = e_hotdesk_logout,
["_0X."] = e_outgoing_sip,
["_NXX"] = e_internal_sip,
["112"] = e_outgoing_emergency,
["911"] = e_outgoing_emergency,
["_*."] = e_dial_custnr,
["_**."] = e_dial_employee
};
["bri-incoming"] = {
["_X."] = e_incoming
};
["default"] = {
include = { "unauthenticated" }
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.