Skip to content

Instantly share code, notes, and snippets.

@moteus moteus/att_xfer_mon.lua
Last active Aug 10, 2017

Embed
What would you like to do?
Monitor transfer channels on FS and put info to memcache to be able do intercep
-- This service can be run as standalone application
-- or from FreeSWITCH using `luarun att_xfer_mon.lua`
--------------------------------------------------------
local EventService = require "uv_event_service"
local Memcached = require "lluv.memcached"
local service_name = 'att_xfer'
local service = EventService.new(service_name, { '127.0.0.1', '8021', 'ClueCon',
reconnect = 5; no_execute_result = true; no_bgapi = true;
subscribe = {
'CHANNEL_DESTROY',
};
filter = {
variable_transfer_disposition = 'recv_replace',
};
})
local log = service:logger()
local cache = Memcached.Connection.new{
server = '127.0.0.1:11211';
reconnect = 5;
}:open()
cache:on('reconnect', function(self, eventName)
log.infof('memcached connected')
end)
cache:on('disconnect', function(self, eventName, err)
log.infof('memcached disconnected: %s', tostring(err or 'closed'))
end)
service:on('esl::event::CHANNEL_DESTROY::**', function(self, eventName, event)
local refer_uuid, bridge_uuid = event:getVariable('refer_uuid'), event:getVariable('bridge_uuid')
if refer_uuid and bridge_uuid then
log.debugf('found refer %s=>%s', refer_uuid, bridge_uuid)
local key = 'refer:' .. refer_uuid
cache:set(key, bridge_uuid, 180, function(self, err, status)
if err then log.errf('can not store key in memcached: %s', tostring(err)) end
end)
end
end)
service:run()
-- implement Attended transfer monitor based on FusionPBX service API
-- without any external deps.
local service_name = 'att_xfer'
require "resources.functions.config"
local log = require "resources.functions.log"[service_name]
local cache = require "resources.functions.cache"
local BasicEventService = require "resources.functions.basic_event_service"
local service = BasicEventService.new(log, service_name)
service:bind("CHANNEL_DESTROY", function(self, name, event)
local refer_uuid, bridge_uuid = event:getHeader('variable_refer_uuid'), event:getHeader('variable_bridge_uuid')
if refer_uuid and bridge_uuid then
log.infof('found refer %s=>%s', refer_uuid, bridge_uuid)
local key = 'refer:' .. refer_uuid
local ok, err = cache.set(key, bridge_uuid, 180)
if not ok then
if err then log.errf('can not store key in memcached: %s', tostring(err)) end
end
end
end)
log.notice("start")
service:run()
log.notice("stop")
require 'resources.functions.config'
require 'resources.functions.trim'
local Database = require 'resources.functions.database'
local api = api or freeswitch.API()
function channel_variable(uuid, name)
local result = api:executeString("uuid_getvar " .. uuid .. " " .. name)
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
-- can be used to get some headers like `Call-State` or `Unique-ID`
-- channel_evalute(uuid, '${Unique-ID}')
function channel_evalute(uuid, cmd)
local cmd = ("eval uuid:%s %s"):format(uuid, cmd)
local result = api:executeString(cmd)
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_transfer(uuid, ext, typ, ctx)
local cmd = ("uuid_transfer %s %s %s %s"):format(uuid, ext, typ, ctx)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_kill(uuid, cause)
cause = cause or 'CALL_REJECTED'
local cmd = ("uuid_kill %s %s"):format(uuid, cause)
local res = trim(api:executeString(cmd))
return res == '+OK'
end
function channel_display(uuid, text)
local cmd = ("uuid_display %s '%s'"):format(uuid, text)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_exists(uuid)
local cmd = ("uuid_exists %s"):format(uuid)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
return result == 'true'
end
local _switchname
local function switchname()
if _switchname then return _switchname end
local result = api:executeString("switchname")
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
_switchname = result
return result
end
function channels_by_number(number, domain)
local hostname = assert(switchname())
local dbh = Database.new('switch')
local full_number = number .. '@' .. (domain or '%')
local sql = ([[select * from channels where hostname='%s' and (
(context = '%s' and (cid_name = '%s' or cid_num = '%s'))
or name like '%s' or presence_id like '%s' or presence_data like '%s'
)
order by created_epoch
]]):format(hostname,
domain, number, number,
full_number, full_number, full_number
)
local rows = assert(dbh:fetch_all(sql))
dbh:release()
return rows
end
--
-- FusionPBX
-- Version: MPL 1.1
--
-- The contents of this file are subject to the Mozilla Public License Version
-- 1.1 (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
-- http://www.mozilla.org/MPL/
--
-- Software distributed under the License is distributed on an "AS IS" basis,
-- WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-- for the specific language governing rights and limitations under the
-- License.
--
-- The Original Code is FusionPBX
--
-- The Initial Developer of the Original Code is
-- Mark J Crane <markjcrane@fusionpbx.com>
-- Copyright (C) 2010 - 2016
-- the Initial Developer. All Rights Reserved.
--
-- Contributor(s):
-- Mark J Crane <markjcrane@fusionpbx.com>
-- Errol W Samuels <ewsamuels@gmail.com>
--user defined variables
local extension = argv[1];
local direction = argv[2] or extension and 'inbound' or 'all';
-- we can use any number because other box should check sip_h_X_*** headers first
local pickup_number = '*8' -- extension and '**' or '*8'
--include config.lua
require "resources.functions.config";
--add the function
require "resources.functions.explode";
require "resources.functions.split";
require "resources.functions.is_uuid";
require "resources.functions.trim";
require "resources.functions.channel_utils";
--prepare the api object
api = freeswitch.API();
--Get cache class
local cache = require "resources.functions.cache"
--Get intercept logger
local log = require "resources.functions.log".intercept
--include database class
local Database = require "resources.functions.database"
--include json library
local json
if (debug["sql"]) then
json = require "resources.functions.lunajson"
end
--get the hostname
local hostname = trim(api:execute("switchname", ""));
-- redirect call to another box
local function make_proxy_call(destination, call_hostname)
destination = destination .. "@" .. domain_name
local profile, proxy = "internal", call_hostname;
local peer = CLUSTER_PEERS and CLUSTER_PEERS[proxy];
if peer then
if type(peer) == "string" then
profile = peer;
else
profile = peer[1] or profile;
proxy = peer[2] or proxy;
end
end
local sip_auth_username = session:getVariable("sip_auth_username");
local sip_auth_password = api:execute("user_data", sip_auth_username .. "@" .. domain_name .." param password");
local auth = "sip_auth_username="..sip_auth_username..",sip_auth_password='"..sip_auth_password.."'"
dial_string = "{sip_invite_domain=" .. domain_name .. "," .. auth .. "}sofia/" .. profile .. "/" .. destination .. ";fs_path=sip:" .. proxy;
log.notice("Send call to other host....");
session:execute("bridge", dial_string);
end
-- check pin number if defined
local function pin(pin_number)
if not pin_number then
return true
end
--sleep
session:sleep(500);
--get the user pin number
local min_digits = 2;
local max_digits = 20;
local max_tries = "3";
local digit_timeout = "5000";
local digits = session:playAndGetDigits(min_digits, max_digits, max_tries, digit_timeout, "#", "phrase:voicemail_enter_pass:#", "", "\\d+");
--validate the user pin number
local pin_number_table = explode(",",pin_number);
for index,pin_number in pairs(pin_number_table) do
if (digits == pin_number) then
--set the authorized pin number that was used
session:setVariable("pin_number", pin_number);
--done
return true;
end
end
--if not authorized play a message and then hangup
session:streamFile("phrase:voicemail_fail_auth:#");
session:hangup("NORMAL_CLEARING");
return;
end
-- do intercept if we get redirected request from another box
local function proxy_intercept()
-- Proceed calls from other boxes
-- Check if this call from other box with setted intercept_uuid
local intercept_uuid = session:getVariable("sip_h_X-intercept_uuid")
if intercept_uuid and #intercept_uuid > 0 then
log.notice("Get intercept_uuid from sip header. Do intercept....")
session:execute("intercept", intercept_uuid)
return true
end
-- Check if this call from other box and we need parent uuid for channel
local child_intercept_uuid = session:getVariable("sip_h_X-child_intercept_uuid")
if (not child_intercept_uuid) or (#child_intercept_uuid == 0) then
return
end
-- search parent uuid
log.notice("Get child_intercept_uuid from sip header.")
local parent_uuid =
channel_variable(child_intercept_uuid, 'ent_originate_aleg_uuid') or
channel_variable(child_intercept_uuid, 'cc_member_session_uuid') or
channel_variable(child_intercept_uuid, 'fifo_bridge_uuid') or
child_intercept_uuid
if parent_uuid == child_intercept_uuid then
log.notice("Can not found parent call. Try intercept child.")
session:execute("intercept", child_intercept_uuid)
return true
end
-- search parent hostname
call_hostname = hostname
--[[ parent and child have to be on same box so we do not search it
log.notice("Found parent channel try detect parent hostname")
local dbh = Database.new('switch')
local sql = "SELECT hostname FROM channels WHERE uuid='" .. parent_uuid .. "'"
local call_hostname = dbh:first_value(sql)
dbh:release()
if not call_hostname then
log.notice("Can not find host name. Channels is dead?")
return true
end
--]]
if hostname == call_hostname then
log.notice("Found parent call on local machine. Do intercept....")
session:execute("intercept", parent_uuid);
return true
end
log.noticef("Found parent call on remote machine `%s`.", call_hostname)
session:execute("export", "sip_h_X-intercept_uuid="..parent_uuid);
make_proxy_call(pickup_number, call_hostname)
return true
end
-- return array of extensions for group
local function select_group_extensions()
-- connect to Fusion database
local dbh = Database.new('system');
--get the call groups the extension is a member of
local sql = "SELECT call_group FROM v_extensions ";
sql = sql .. "WHERE domain_uuid = :domain_uuid ";
sql = sql .. "AND (extension = :caller_id_number ";
sql = sql .. "OR number_alias = :caller_id_number)";
sql = sql .. "limit 1";
local params = {domain_uuid = domain_uuid, caller_id_number = caller_id_number};
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local call_group = dbh:first_value(sql, params) or ''
log.noticef("call_group: `%s`", call_group);
call_groups = explode(",", call_group);
params = {domain_uuid = domain_uuid};
--get the extensions in the call groups
sql = "SELECT extension, number_alias FROM v_extensions ";
sql = sql .. "WHERE domain_uuid = :domain_uuid ";
sql = sql .. "AND (";
for key,call_group in ipairs(call_groups) do
if key > 1 then sql = sql .. " OR " end
if #call_group == 0 then
sql = sql .. "call_group = '' or call_group is NULL";
else
local param_name = "call_group_" .. tostring(key)
sql = sql .. "call_group like :" .. param_name;
params[param_name] = '%' .. call_group .. '%';
end
end
sql = sql .. ") ";
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local extensions = {}
dbh:query(sql, params, function(row)
local member = row.extension
if row.number_alias and #row.number_alias > 0 then
member = row.number_alias
end
extensions[#extensions+1] = member
log.noticef("member `%s`", member)
end);
-- release Fusion database
dbh:release()
-- return result
return extensions
end
--check if the session is ready
if ( session:ready() ) then
--answer the session
session:answer();
--get session variables
domain_uuid = session:getVariable("domain_uuid");
domain_name = session:getVariable("domain_name");
pin_number = session:getVariable("pin_number");
context = session:getVariable("context");
caller_id_number = session:getVariable("caller_id_number");
end
--check if the session is ready
if ( session:ready() ) then
if proxy_intercept() then
return
end
end
--check if the session is ready
if ( session:ready() ) then
--if the pin number is provided then require it
if not pin(pin_number) then
return
end
end
if ( session:ready() ) then
-- select intercept mode
if not extension then
log.notice("GROUP INTERCEPT")
extensions = select_group_extensions()
else
log.noticef("INTERCEPT %s", extension)
extensions = {extension}
end
--connect to FS database
local dbh = Database.new('switch')
--check the database to get the uuid of a ringing call
call_hostname = "";
sql = "SELECT uuid, call_uuid, hostname FROM channels ";
sql = sql .. "WHERE callstate IN ('RINGING', 'EARLY') ";
-- next check should prevent pickup call from extension
-- e.g. if extension 100 dial some cell phone and some one else dial *8
-- he can pickup this call.
if not direction:find('all') then
sql = sql .. "AND (1 <> 1 "
-- calls from freeswitch to user
if direction:find('inbound') then
sql = sql .. "OR direction = 'outbound' ";
end
-- calls from user to freeswitch
if direction:find('outbound') then
sql = sql .. "OR direction = 'inbound' ";
end
sql = sql .. ") "
end
sql = sql .. "AND (1<>1 ";
local params = {};
for key,extension in pairs(extensions) do
local param_name = "presence_id_" .. tostring(key);
sql = sql .. "OR presence_id = :" .. param_name .. " ";
params[param_name] = extension.."@"..domain_name;
end
sql = sql .. ") ";
sql = sql .. "AND call_uuid IS NOT NULL ";
sql = sql .. "LIMIT 1 ";
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local is_child, channel_uuid
dbh:query(sql, params, function(row)
--for key, val in pairs(row) do
-- log.notice("row "..key.." "..val);
--end
--log.notice("-----------------------");
is_child = (row.uuid == row.call_uuid)
uuid = row.call_uuid;
channel_uuid = row.uuid;
call_hostname = row.hostname;
end);
log.notice("uuid: "..uuid);
log.notice("call_hostname: "..call_hostname);
-- it is can be attended transfer
if uuid and not is_child and call_hostname == hostname then
log.info('<<<<< Try detect attended transfer case >>>>>')
-- test case with attended transfer when transferer hangup, but transfer
-- target does not answer. In this case a-leg -- does not exists anymore
if not channel_exists(uuid) then
log.info('<<<<< no call channel assume it hangup >>>>>')
-- external service have to put this info in temporary storage.
-- It can be done e.g. in xml cdr handler or via Lua script using EventConsumer
-- or in external ESL service
local bridge_uuid, err = cache.get('refer:' .. uuid)
log.info('<<<<< bridge_uuid ' .. tostring(bridge_uuid or err) .. ' >>>>>')
if not bridge_uuid then
-- in some cases FS calls `uuid_bridge` app before transfer target answer
-- se we can try find `transfer_source` header to find out needed channel.
-- Format is `<timestamp>:<channel_unique_id>:uuid_br:<channel_uuid>
-- or `<timestamp>:<channel_uuid>:uuid_br:<channel_unique_id>.
-- Seems we need handle only first one.
log.info('<<<<< try detect uuid_bridge info >>>>>')
bridge_uuid, err = channel_variable(channel_uuid, 'transfer_source')
if bridge_uuid then
log.info('<<<<< found transfer_source header >>>>>')
_, bridge_uuid = split_first(bridge_uuid, ':uuid_br:', true)
log.info('<<<<< bridge_uuid ' .. tostring(bridge_uuid) .. ' >>>>>')
uuid = bridge_uuid or uuid
end
end
uuid = bridge_uuid or uuid
end
end
if is_child then
-- we need intercept `parent` call e.g. call in FIFO/CallCenter Queue
if (call_hostname == hostname) then
log.notice("Found child call on local machine. Try find parent channel.")
local parent_uuid =
channel_variable(uuid, 'ent_originate_aleg_uuid') or
channel_variable(uuid, 'cc_member_session_uuid') or
channel_variable(uuid, 'fifo_bridge_uuid') or
uuid
--[[ parent and child have to be on same box so we do not search it
if parent_uuid ~= uuid then
local sql = "SELECT hostname FROM channels WHERE uuid='" .. uuid .. "'"
call_hostname = dbh:first_value(sql)
end
--]]
if call_hostname then
uuid = parent_uuid
if call_hostname ~= hostname then
log.noticef("Found parent call on remote machine `%s`.", call_hostname)
else
log.notice("Found parent call on local machine.")
end
end
else
log.noticef("Found child call on remote machine `%s`.", call_hostname)
-- we can not find parent on this box because channel on other box so we have to
-- forward call to this box
session:execute("export", "sip_h_X-child_intercept_uuid="..uuid);
return make_proxy_call(pickup_number, call_hostname)
end
end
--release FS database
dbh:release()
end
log.noticef( "Hostname: %s Call Hostname: %s", hostname, call_hostname);
--intercept a call that is ringing
if (uuid ~= nil) then
if (session:getVariable("billmsec") == nil) then
if (hostname == call_hostname) then
session:execute("intercept", uuid);
else
session:execute("export", "sip_h_X-intercept_uuid="..uuid);
make_proxy_call(pickup_number, call_hostname)
end
end
end
--notes
--originate a call
--cmd = "originate user/1007@voip.example.com &intercept("..uuid..")";
--api = freeswitch.API();
--result = api:executeString(cmd);
-- Implement service class for FusionPBX that uses LibUV library to connect to FS.
-- So it can be run from FS like `luarun service.lua` or as standalone process.
--
-- @usage
--
-- local service = EventService.new('blf', { '127.0.0.1', '8021', 'ClueCon',
-- reconnect = 5;
-- subscribe = {'PRESENCE_PROBE'}, filter = { ['Caller-Direction']='inbound' }
-- })
--
-- -- FS receive SUBSCRIBE to BLF from device
-- service:on("esl::event::PRESENCE_PROBE::*", , function(self, eventName, event)
-- local proto = event:getHeader('proto')
-- end)
--
-- service:run()
--
local uv = require "lluv"
local ut = require "lluv.utils"
local ESLConnection = require "lluv.esl.connection".Connection
local Logging = require "log"
local new_uuid
if freeswitch then
local api = require "resources.functions.api"
new_uuid = function()
return api:execute("create_uuid")
end
else
local Uuid = require "uuid"
new_uuid = function()
return Uuid.new()
end
end
-- create new pid file and start monitor on it.
-- if file removed/changed then service terminates
local function service_pid_file(service, pid_path)
local log = service:logger()
local service_name = service:name()
local buffers = {
push = function(self, v)
self[#self + 1] = v
return self
end;
pop = function(self)
local v = self[#self];
self[#self] = nil
return v
end;
}
local function read_file(p, cb)
local buf = buffers:pop() or uv.buffer(1024)
uv.fs_open(p, "r", function(file, err, path)
buffers:push(buf)
if err then return cb(err) end
file:read(buf, function(file, err, data, size)
file:close()
if err then return cb(err) end
return cb(nil, data:to_s(size))
end)
end)
end
local function write_file(p, data, cb)
uv.fs_open(p, 'w+', function(file, err, path)
if err then return cb(err) end
file:write(data, function(file, err)
file:close()
return cb(err)
end)
end)
end
local uuid = new_uuid()
local pid_file = pid_path .. "/" .. service_name .. ".tmp"
local pid = {pid = uuid, file = pid_file, valid = true}
local test_in_progress
local function test_pid_file()
if test_in_progress then return end
test_in_progress = true
read_file(pid.file, function(err, data)
test_in_progress = false
if err then
log.infof('can not read pid file: %s', tostring(err))
return uv.stop()
end
if data ~= pid.pid then
log.infof('detect launch second instance of service')
pid.valid = false -- we do not remove it when stop service
return uv.stop()
end
end)
end
-- crete pid file
uv.fs_mkdir(pid_path, function(loop, err)
if err and err:no() ~= uv.EEXIST then
log.errf('can not create pid directory: %s', tostring(err))
return uv.stop()
end
write_file(pid.file, pid.pid, function(err)
if err then
log.errf('can not create pid file: %s', tostring(err))
return uv.stop()
end
uv.timer():start(30000, 30000, test_pid_file)
uv.fs_event():start(pid.file, function(_, err, path, ev, ...)
if err then
log.warningf('can not start file monitoring')
end
return test_pid_file()
end)
end)
end)
return pid
end
-- start service process
local function service_start(service)
local log = service:logger()
local pid if freeswitch then
require "resources.functions.config"
local pid_path = scripts_dir .. "/run"
pid = service_pid_file(service, pid_path)
end
log.infof('service %s started', service:name())
local ok, err = pcall(uv.run)
if not ok then
log.errf('%s', tostring(err))
end
log.infof('service %s stopped', service:name())
if pid and pid.valid then
os.remove(pid.file)
end
end
-- register all needed listeners to manage service status
local function service_init_loop(service)
local log = service:logger()
service:on("esl::event::CUSTOM::*", function(self, eventName, event)
if event:getHeader('Event-Subclass') ~= 'fusion::service::control' then
return
end
if service:name() ~= event:getHeader('service-name') then return end
local command = event:getHeader('service-command')
if command == "stop" then
log.infof('receive stop service command')
return uv.stop()
end
end)
service:on("esl::event::SHUTDOWN::*", function(self, eventName, event)
log.infof('freeswitch shutdown')
return uv.stop()
end)
service:on('esl::reconnect', function(self, eventName)
log.infof('esl connected')
end)
service:on('esl::disconnect', function(self, eventName, err)
log.infof('esl disconnected')
end)
service:on('esl::error::**', function(self, eventName, err)
log.errf('esl error: %s', tostring(err))
end)
service:on('esl::close', function(self, eventName, err)
-- print(eventName, err)
end)
--! @todo way to stop service if it runnuing not from FS
-- E.g. using LuaService on Windows and signals on *nix systems
return service
end
local function append(t, v)
t[#t + 1] = v
end
local EventService = ut.class(ESLConnection) do
local super = ut.class.super(EventService)
function EventService:__init(service_name, params)
params = params or {}
params.subscribe = params.subscribe or {}
if freeswitch then
append(params.subscribe, 'CUSTOM::fusion::service::control')
append(params.subscribe, 'SHUTDOWN')
if params.filter and next(params.filter) then
params.filter['Event-Subclass'] = 'fusion::service::control'
params.filter['Event-name'] = 'SHUTDOWN'
end
end
self = super(self, '__init', params)
local log do
local writer if freeswitch then
writer = require "log.writer.prefix".new('[' .. service_name .. '] ',
require "log.writer.freeswitch".new()
)
else
writer = require "log.writer.stdout".new()
end
log = Logging.new( writer,
require "log.formatter.pformat".new()
)
log.errf = log.errf or log.error
log.warningf = log.warningf or log.warning
log.infof = log.infof or log.info
log.debugf = log.debugf or log.debug
end
self._logger = log
self._service_name = service_name
service_init_loop(self)
return self:open()
end
function EventService:run()
service_start(self)
end
function EventService:stop()
uv.stop()
end
function EventService:logger()
return self._logger
end
function EventService:name()
return self._service_name
end
end
return EventService
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.