Skip to content

Instantly share code, notes, and snippets.

@moteus
Last active August 10, 2017 13:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moteus/6ab6457e07d3825eb2780c04caee38ca to your computer and use it in GitHub Desktop.
Save moteus/6ab6457e07d3825eb2780c04caee38ca to your computer and use it in GitHub Desktop.
Monitor transfer channels on FS and put info to memcache to be able do intercep
-- This service can be run as standalone application
-- or from FreeSWITCH using `luarun att_xfer_mon.lua`
--------------------------------------------------------
local EventService = require "uv_event_service"
local Memcached = require "lluv.memcached"
local service_name = 'att_xfer'
local service = EventService.new(service_name, { '127.0.0.1', '8021', 'ClueCon',
reconnect = 5; no_execute_result = true; no_bgapi = true;
subscribe = {
'CHANNEL_DESTROY',
};
filter = {
variable_transfer_disposition = 'recv_replace',
};
})
local log = service:logger()
local cache = Memcached.Connection.new{
server = '127.0.0.1:11211';
reconnect = 5;
}:open()
cache:on('reconnect', function(self, eventName)
log.infof('memcached connected')
end)
cache:on('disconnect', function(self, eventName, err)
log.infof('memcached disconnected: %s', tostring(err or 'closed'))
end)
service:on('esl::event::CHANNEL_DESTROY::**', function(self, eventName, event)
local refer_uuid, bridge_uuid = event:getVariable('refer_uuid'), event:getVariable('bridge_uuid')
if refer_uuid and bridge_uuid then
log.debugf('found refer %s=>%s', refer_uuid, bridge_uuid)
local key = 'refer:' .. refer_uuid
cache:set(key, bridge_uuid, 180, function(self, err, status)
if err then log.errf('can not store key in memcached: %s', tostring(err)) end
end)
end
end)
service:run()
-- implement Attended transfer monitor based on FusionPBX service API
-- without any external deps.
local service_name = 'att_xfer'
require "resources.functions.config"
local log = require "resources.functions.log"[service_name]
local cache = require "resources.functions.cache"
local BasicEventService = require "resources.functions.basic_event_service"
local service = BasicEventService.new(log, service_name)
service:bind("CHANNEL_DESTROY", function(self, name, event)
local refer_uuid, bridge_uuid = event:getHeader('variable_refer_uuid'), event:getHeader('variable_bridge_uuid')
if refer_uuid and bridge_uuid then
log.infof('found refer %s=>%s', refer_uuid, bridge_uuid)
local key = 'refer:' .. refer_uuid
local ok, err = cache.set(key, bridge_uuid, 180)
if not ok then
if err then log.errf('can not store key in memcached: %s', tostring(err)) end
end
end
end)
log.notice("start")
service:run()
log.notice("stop")
require 'resources.functions.config'
require 'resources.functions.trim'
local Database = require 'resources.functions.database'
local api = api or freeswitch.API()
function channel_variable(uuid, name)
local result = api:executeString("uuid_getvar " .. uuid .. " " .. name)
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
-- can be used to get some headers like `Call-State` or `Unique-ID`
-- channel_evalute(uuid, '${Unique-ID}')
function channel_evalute(uuid, cmd)
local cmd = ("eval uuid:%s %s"):format(uuid, cmd)
local result = api:executeString(cmd)
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_transfer(uuid, ext, typ, ctx)
local cmd = ("uuid_transfer %s %s %s %s"):format(uuid, ext, typ, ctx)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_kill(uuid, cause)
cause = cause or 'CALL_REJECTED'
local cmd = ("uuid_kill %s %s"):format(uuid, cause)
local res = trim(api:executeString(cmd))
return res == '+OK'
end
function channel_display(uuid, text)
local cmd = ("uuid_display %s '%s'"):format(uuid, text)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
return result
end
function channel_exists(uuid)
local cmd = ("uuid_exists %s"):format(uuid)
local result = trim(api:executeString(cmd))
if result:sub(1, 4) == '-ERR' then return nil, result end
return result == 'true'
end
local _switchname
local function switchname()
if _switchname then return _switchname end
local result = api:executeString("switchname")
if result:sub(1, 4) == '-ERR' then return nil, result end
if result == '_undef_' then return false end
_switchname = result
return result
end
function channels_by_number(number, domain)
local hostname = assert(switchname())
local dbh = Database.new('switch')
local full_number = number .. '@' .. (domain or '%')
local sql = ([[select * from channels where hostname='%s' and (
(context = '%s' and (cid_name = '%s' or cid_num = '%s'))
or name like '%s' or presence_id like '%s' or presence_data like '%s'
)
order by created_epoch
]]):format(hostname,
domain, number, number,
full_number, full_number, full_number
)
local rows = assert(dbh:fetch_all(sql))
dbh:release()
return rows
end
--
-- FusionPBX
-- Version: MPL 1.1
--
-- The contents of this file are subject to the Mozilla Public License Version
-- 1.1 (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
-- http://www.mozilla.org/MPL/
--
-- Software distributed under the License is distributed on an "AS IS" basis,
-- WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-- for the specific language governing rights and limitations under the
-- License.
--
-- The Original Code is FusionPBX
--
-- The Initial Developer of the Original Code is
-- Mark J Crane <markjcrane@fusionpbx.com>
-- Copyright (C) 2010 - 2016
-- the Initial Developer. All Rights Reserved.
--
-- Contributor(s):
-- Mark J Crane <markjcrane@fusionpbx.com>
-- Errol W Samuels <ewsamuels@gmail.com>
--user defined variables
local extension = argv[1];
local direction = argv[2] or extension and 'inbound' or 'all';
-- we can use any number because other box should check sip_h_X_*** headers first
local pickup_number = '*8' -- extension and '**' or '*8'
--include config.lua
require "resources.functions.config";
--add the function
require "resources.functions.explode";
require "resources.functions.split";
require "resources.functions.is_uuid";
require "resources.functions.trim";
require "resources.functions.channel_utils";
--prepare the api object
api = freeswitch.API();
--Get cache class
local cache = require "resources.functions.cache"
--Get intercept logger
local log = require "resources.functions.log".intercept
--include database class
local Database = require "resources.functions.database"
--include json library
local json
if (debug["sql"]) then
json = require "resources.functions.lunajson"
end
--get the hostname
local hostname = trim(api:execute("switchname", ""));
-- redirect call to another box
local function make_proxy_call(destination, call_hostname)
destination = destination .. "@" .. domain_name
local profile, proxy = "internal", call_hostname;
local peer = CLUSTER_PEERS and CLUSTER_PEERS[proxy];
if peer then
if type(peer) == "string" then
profile = peer;
else
profile = peer[1] or profile;
proxy = peer[2] or proxy;
end
end
local sip_auth_username = session:getVariable("sip_auth_username");
local sip_auth_password = api:execute("user_data", sip_auth_username .. "@" .. domain_name .." param password");
local auth = "sip_auth_username="..sip_auth_username..",sip_auth_password='"..sip_auth_password.."'"
dial_string = "{sip_invite_domain=" .. domain_name .. "," .. auth .. "}sofia/" .. profile .. "/" .. destination .. ";fs_path=sip:" .. proxy;
log.notice("Send call to other host....");
session:execute("bridge", dial_string);
end
-- check pin number if defined
local function pin(pin_number)
if not pin_number then
return true
end
--sleep
session:sleep(500);
--get the user pin number
local min_digits = 2;
local max_digits = 20;
local max_tries = "3";
local digit_timeout = "5000";
local digits = session:playAndGetDigits(min_digits, max_digits, max_tries, digit_timeout, "#", "phrase:voicemail_enter_pass:#", "", "\\d+");
--validate the user pin number
local pin_number_table = explode(",",pin_number);
for index,pin_number in pairs(pin_number_table) do
if (digits == pin_number) then
--set the authorized pin number that was used
session:setVariable("pin_number", pin_number);
--done
return true;
end
end
--if not authorized play a message and then hangup
session:streamFile("phrase:voicemail_fail_auth:#");
session:hangup("NORMAL_CLEARING");
return;
end
-- do intercept if we get redirected request from another box
local function proxy_intercept()
-- Proceed calls from other boxes
-- Check if this call from other box with setted intercept_uuid
local intercept_uuid = session:getVariable("sip_h_X-intercept_uuid")
if intercept_uuid and #intercept_uuid > 0 then
log.notice("Get intercept_uuid from sip header. Do intercept....")
session:execute("intercept", intercept_uuid)
return true
end
-- Check if this call from other box and we need parent uuid for channel
local child_intercept_uuid = session:getVariable("sip_h_X-child_intercept_uuid")
if (not child_intercept_uuid) or (#child_intercept_uuid == 0) then
return
end
-- search parent uuid
log.notice("Get child_intercept_uuid from sip header.")
local parent_uuid =
channel_variable(child_intercept_uuid, 'ent_originate_aleg_uuid') or
channel_variable(child_intercept_uuid, 'cc_member_session_uuid') or
channel_variable(child_intercept_uuid, 'fifo_bridge_uuid') or
child_intercept_uuid
if parent_uuid == child_intercept_uuid then
log.notice("Can not found parent call. Try intercept child.")
session:execute("intercept", child_intercept_uuid)
return true
end
-- search parent hostname
call_hostname = hostname
--[[ parent and child have to be on same box so we do not search it
log.notice("Found parent channel try detect parent hostname")
local dbh = Database.new('switch')
local sql = "SELECT hostname FROM channels WHERE uuid='" .. parent_uuid .. "'"
local call_hostname = dbh:first_value(sql)
dbh:release()
if not call_hostname then
log.notice("Can not find host name. Channels is dead?")
return true
end
--]]
if hostname == call_hostname then
log.notice("Found parent call on local machine. Do intercept....")
session:execute("intercept", parent_uuid);
return true
end
log.noticef("Found parent call on remote machine `%s`.", call_hostname)
session:execute("export", "sip_h_X-intercept_uuid="..parent_uuid);
make_proxy_call(pickup_number, call_hostname)
return true
end
-- return array of extensions for group
local function select_group_extensions()
-- connect to Fusion database
local dbh = Database.new('system');
--get the call groups the extension is a member of
local sql = "SELECT call_group FROM v_extensions ";
sql = sql .. "WHERE domain_uuid = :domain_uuid ";
sql = sql .. "AND (extension = :caller_id_number ";
sql = sql .. "OR number_alias = :caller_id_number)";
sql = sql .. "limit 1";
local params = {domain_uuid = domain_uuid, caller_id_number = caller_id_number};
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local call_group = dbh:first_value(sql, params) or ''
log.noticef("call_group: `%s`", call_group);
call_groups = explode(",", call_group);
params = {domain_uuid = domain_uuid};
--get the extensions in the call groups
sql = "SELECT extension, number_alias FROM v_extensions ";
sql = sql .. "WHERE domain_uuid = :domain_uuid ";
sql = sql .. "AND (";
for key,call_group in ipairs(call_groups) do
if key > 1 then sql = sql .. " OR " end
if #call_group == 0 then
sql = sql .. "call_group = '' or call_group is NULL";
else
local param_name = "call_group_" .. tostring(key)
sql = sql .. "call_group like :" .. param_name;
params[param_name] = '%' .. call_group .. '%';
end
end
sql = sql .. ") ";
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local extensions = {}
dbh:query(sql, params, function(row)
local member = row.extension
if row.number_alias and #row.number_alias > 0 then
member = row.number_alias
end
extensions[#extensions+1] = member
log.noticef("member `%s`", member)
end);
-- release Fusion database
dbh:release()
-- return result
return extensions
end
--check if the session is ready
if ( session:ready() ) then
--answer the session
session:answer();
--get session variables
domain_uuid = session:getVariable("domain_uuid");
domain_name = session:getVariable("domain_name");
pin_number = session:getVariable("pin_number");
context = session:getVariable("context");
caller_id_number = session:getVariable("caller_id_number");
end
--check if the session is ready
if ( session:ready() ) then
if proxy_intercept() then
return
end
end
--check if the session is ready
if ( session:ready() ) then
--if the pin number is provided then require it
if not pin(pin_number) then
return
end
end
if ( session:ready() ) then
-- select intercept mode
if not extension then
log.notice("GROUP INTERCEPT")
extensions = select_group_extensions()
else
log.noticef("INTERCEPT %s", extension)
extensions = {extension}
end
--connect to FS database
local dbh = Database.new('switch')
--check the database to get the uuid of a ringing call
call_hostname = "";
sql = "SELECT uuid, call_uuid, hostname FROM channels ";
sql = sql .. "WHERE callstate IN ('RINGING', 'EARLY') ";
-- next check should prevent pickup call from extension
-- e.g. if extension 100 dial some cell phone and some one else dial *8
-- he can pickup this call.
if not direction:find('all') then
sql = sql .. "AND (1 <> 1 "
-- calls from freeswitch to user
if direction:find('inbound') then
sql = sql .. "OR direction = 'outbound' ";
end
-- calls from user to freeswitch
if direction:find('outbound') then
sql = sql .. "OR direction = 'inbound' ";
end
sql = sql .. ") "
end
sql = sql .. "AND (1<>1 ";
local params = {};
for key,extension in pairs(extensions) do
local param_name = "presence_id_" .. tostring(key);
sql = sql .. "OR presence_id = :" .. param_name .. " ";
params[param_name] = extension.."@"..domain_name;
end
sql = sql .. ") ";
sql = sql .. "AND call_uuid IS NOT NULL ";
sql = sql .. "LIMIT 1 ";
if (debug["sql"]) then
log.noticef("SQL: %s; params: %s", sql, json.encode(params));
end
local is_child, channel_uuid
dbh:query(sql, params, function(row)
--for key, val in pairs(row) do
-- log.notice("row "..key.." "..val);
--end
--log.notice("-----------------------");
is_child = (row.uuid == row.call_uuid)
uuid = row.call_uuid;
channel_uuid = row.uuid;
call_hostname = row.hostname;
end);
log.notice("uuid: "..uuid);
log.notice("call_hostname: "..call_hostname);
-- it is can be attended transfer
if uuid and not is_child and call_hostname == hostname then
log.info('<<<<< Try detect attended transfer case >>>>>')
-- test case with attended transfer when transferer hangup, but transfer
-- target does not answer. In this case a-leg -- does not exists anymore
if not channel_exists(uuid) then
log.info('<<<<< no call channel assume it hangup >>>>>')
-- external service have to put this info in temporary storage.
-- It can be done e.g. in xml cdr handler or via Lua script using EventConsumer
-- or in external ESL service
local bridge_uuid, err = cache.get('refer:' .. uuid)
log.info('<<<<< bridge_uuid ' .. tostring(bridge_uuid or err) .. ' >>>>>')
if not bridge_uuid then
-- in some cases FS calls `uuid_bridge` app before transfer target answer
-- se we can try find `transfer_source` header to find out needed channel.
-- Format is `<timestamp>:<channel_unique_id>:uuid_br:<channel_uuid>
-- or `<timestamp>:<channel_uuid>:uuid_br:<channel_unique_id>.
-- Seems we need handle only first one.
log.info('<<<<< try detect uuid_bridge info >>>>>')
bridge_uuid, err = channel_variable(channel_uuid, 'transfer_source')
if bridge_uuid then
log.info('<<<<< found transfer_source header >>>>>')
_, bridge_uuid = split_first(bridge_uuid, ':uuid_br:', true)
log.info('<<<<< bridge_uuid ' .. tostring(bridge_uuid) .. ' >>>>>')
uuid = bridge_uuid or uuid
end
end
uuid = bridge_uuid or uuid
end
end
if is_child then
-- we need intercept `parent` call e.g. call in FIFO/CallCenter Queue
if (call_hostname == hostname) then
log.notice("Found child call on local machine. Try find parent channel.")
local parent_uuid =
channel_variable(uuid, 'ent_originate_aleg_uuid') or
channel_variable(uuid, 'cc_member_session_uuid') or
channel_variable(uuid, 'fifo_bridge_uuid') or
uuid
--[[ parent and child have to be on same box so we do not search it
if parent_uuid ~= uuid then
local sql = "SELECT hostname FROM channels WHERE uuid='" .. uuid .. "'"
call_hostname = dbh:first_value(sql)
end
--]]
if call_hostname then
uuid = parent_uuid
if call_hostname ~= hostname then
log.noticef("Found parent call on remote machine `%s`.", call_hostname)
else
log.notice("Found parent call on local machine.")
end
end
else
log.noticef("Found child call on remote machine `%s`.", call_hostname)
-- we can not find parent on this box because channel on other box so we have to
-- forward call to this box
session:execute("export", "sip_h_X-child_intercept_uuid="..uuid);
return make_proxy_call(pickup_number, call_hostname)
end
end
--release FS database
dbh:release()
end
log.noticef( "Hostname: %s Call Hostname: %s", hostname, call_hostname);
--intercept a call that is ringing
if (uuid ~= nil) then
if (session:getVariable("billmsec") == nil) then
if (hostname == call_hostname) then
session:execute("intercept", uuid);
else
session:execute("export", "sip_h_X-intercept_uuid="..uuid);
make_proxy_call(pickup_number, call_hostname)
end
end
end
--notes
--originate a call
--cmd = "originate user/1007@voip.example.com &intercept("..uuid..")";
--api = freeswitch.API();
--result = api:executeString(cmd);
-- Implement service class for FusionPBX that uses LibUV library to connect to FS.
-- So it can be run from FS like `luarun service.lua` or as standalone process.
--
-- @usage
--
-- local service = EventService.new('blf', { '127.0.0.1', '8021', 'ClueCon',
-- reconnect = 5;
-- subscribe = {'PRESENCE_PROBE'}, filter = { ['Caller-Direction']='inbound' }
-- })
--
-- -- FS receive SUBSCRIBE to BLF from device
-- service:on("esl::event::PRESENCE_PROBE::*", , function(self, eventName, event)
-- local proto = event:getHeader('proto')
-- end)
--
-- service:run()
--
local uv = require "lluv"
local ut = require "lluv.utils"
local ESLConnection = require "lluv.esl.connection".Connection
local Logging = require "log"
local new_uuid
if freeswitch then
local api = require "resources.functions.api"
new_uuid = function()
return api:execute("create_uuid")
end
else
local Uuid = require "uuid"
new_uuid = function()
return Uuid.new()
end
end
-- create new pid file and start monitor on it.
-- if file removed/changed then service terminates
local function service_pid_file(service, pid_path)
local log = service:logger()
local service_name = service:name()
local buffers = {
push = function(self, v)
self[#self + 1] = v
return self
end;
pop = function(self)
local v = self[#self];
self[#self] = nil
return v
end;
}
local function read_file(p, cb)
local buf = buffers:pop() or uv.buffer(1024)
uv.fs_open(p, "r", function(file, err, path)
buffers:push(buf)
if err then return cb(err) end
file:read(buf, function(file, err, data, size)
file:close()
if err then return cb(err) end
return cb(nil, data:to_s(size))
end)
end)
end
local function write_file(p, data, cb)
uv.fs_open(p, 'w+', function(file, err, path)
if err then return cb(err) end
file:write(data, function(file, err)
file:close()
return cb(err)
end)
end)
end
local uuid = new_uuid()
local pid_file = pid_path .. "/" .. service_name .. ".tmp"
local pid = {pid = uuid, file = pid_file, valid = true}
local test_in_progress
local function test_pid_file()
if test_in_progress then return end
test_in_progress = true
read_file(pid.file, function(err, data)
test_in_progress = false
if err then
log.infof('can not read pid file: %s', tostring(err))
return uv.stop()
end
if data ~= pid.pid then
log.infof('detect launch second instance of service')
pid.valid = false -- we do not remove it when stop service
return uv.stop()
end
end)
end
-- crete pid file
uv.fs_mkdir(pid_path, function(loop, err)
if err and err:no() ~= uv.EEXIST then
log.errf('can not create pid directory: %s', tostring(err))
return uv.stop()
end
write_file(pid.file, pid.pid, function(err)
if err then
log.errf('can not create pid file: %s', tostring(err))
return uv.stop()
end
uv.timer():start(30000, 30000, test_pid_file)
uv.fs_event():start(pid.file, function(_, err, path, ev, ...)
if err then
log.warningf('can not start file monitoring')
end
return test_pid_file()
end)
end)
end)
return pid
end
-- start service process
local function service_start(service)
local log = service:logger()
local pid if freeswitch then
require "resources.functions.config"
local pid_path = scripts_dir .. "/run"
pid = service_pid_file(service, pid_path)
end
log.infof('service %s started', service:name())
local ok, err = pcall(uv.run)
if not ok then
log.errf('%s', tostring(err))
end
log.infof('service %s stopped', service:name())
if pid and pid.valid then
os.remove(pid.file)
end
end
-- register all needed listeners to manage service status
local function service_init_loop(service)
local log = service:logger()
service:on("esl::event::CUSTOM::*", function(self, eventName, event)
if event:getHeader('Event-Subclass') ~= 'fusion::service::control' then
return
end
if service:name() ~= event:getHeader('service-name') then return end
local command = event:getHeader('service-command')
if command == "stop" then
log.infof('receive stop service command')
return uv.stop()
end
end)
service:on("esl::event::SHUTDOWN::*", function(self, eventName, event)
log.infof('freeswitch shutdown')
return uv.stop()
end)
service:on('esl::reconnect', function(self, eventName)
log.infof('esl connected')
end)
service:on('esl::disconnect', function(self, eventName, err)
log.infof('esl disconnected')
end)
service:on('esl::error::**', function(self, eventName, err)
log.errf('esl error: %s', tostring(err))
end)
service:on('esl::close', function(self, eventName, err)
-- print(eventName, err)
end)
--! @todo way to stop service if it runnuing not from FS
-- E.g. using LuaService on Windows and signals on *nix systems
return service
end
local function append(t, v)
t[#t + 1] = v
end
local EventService = ut.class(ESLConnection) do
local super = ut.class.super(EventService)
function EventService:__init(service_name, params)
params = params or {}
params.subscribe = params.subscribe or {}
if freeswitch then
append(params.subscribe, 'CUSTOM::fusion::service::control')
append(params.subscribe, 'SHUTDOWN')
if params.filter and next(params.filter) then
params.filter['Event-Subclass'] = 'fusion::service::control'
params.filter['Event-name'] = 'SHUTDOWN'
end
end
self = super(self, '__init', params)
local log do
local writer if freeswitch then
writer = require "log.writer.prefix".new('[' .. service_name .. '] ',
require "log.writer.freeswitch".new()
)
else
writer = require "log.writer.stdout".new()
end
log = Logging.new( writer,
require "log.formatter.pformat".new()
)
log.errf = log.errf or log.error
log.warningf = log.warningf or log.warning
log.infof = log.infof or log.info
log.debugf = log.debugf or log.debug
end
self._logger = log
self._service_name = service_name
service_init_loop(self)
return self:open()
end
function EventService:run()
service_start(self)
end
function EventService:stop()
uv.stop()
end
function EventService:logger()
return self._logger
end
function EventService:name()
return self._service_name
end
end
return EventService
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment