Skip to content

Instantly share code, notes, and snippets.

@areski
Created April 7, 2011 17:39
Show Gist options
  • Save areski/908274 to your computer and use it in GitHub Desktop.
Save areski/908274 to your computer and use it in GitHub Desktop.
new sample - telefonie
# -*- coding: utf-8 -*-
from telephonie.core.inboundsocket import InboundEventSocket
from telephonie.core.errors import ConnectError
from telephonie.utils.logger import StdoutLogger
import gevent.event
import random
random.seed()
CONTACTS = (
'{originate_timeout=20}user/1000 &playback(/usr/local/freeswitch/sounds/en/us/callie/base256/8000/liberty.wav)',
#'{originate_timeout=20}user/1000 &playback(/usr/local/freeswitch/sounds/en/us/callie/base256/8000/liberty.wav)',
#'{originate_timeout=20}user/1000 &playback(/usr/local/freeswitch/sounds/en/us/callie/base256/8000/liberty.wav)',
)
class CampaignHandler():
def __init__(self):
self.log = log
self.active_campaign = {}
self.subscriber_per_campaign = {}
def check_campaign(self):
"""
retrieve the list of active campaigns
this will connect to RabbitMQ, Redis or API
"""
self.active_campaign[1] = True
self.active_campaign[2] = True
# we will pull the campaign settings too
def check_pending_subscriber(self, campaign):
"""
retrieve the list of subscriber to call in the next hoop
this will connect to RabbitMQ, Redis or API
and return a list of user to connect
We also retrieve the method to connect to those user :
(Gateways URI, Application to deliver)
"""
campaign = 1
self.subscriber_per_campaign[campaign] = ['{originate_timeout=20}user/1000 &playback(/usr/local/freeswitch/sounds/en/us/callie/base256/8000/liberty.wav)']
def get_pending_subscriber(self, campaign):
"""
retrieve the list of subscriber to call in the next hoop
this will connect to RabbitMQ, Redis or API
and return a list of user to connect
We also retrieve the method to connect to those user :
(Gateways URI, Application to deliver)
@campaign is the campaign ID
"""
campaign = 1
return self.subscriber_per_campaign[campaign]
class MyEventSocket(InboundEventSocket):
def __init__(self, host, port, password, filter="ALL", log=None):
InboundEventSocket.__init__(self, host, port, password, filter)
self.log = log
self.jobs = {}
self.hangups = {}
def track_hangup(self, job_uuid):
self.hangups[job_uuid] = gevent.event.AsyncResult()
def untrack_hangup(self, job_uuid):
try:
del self.hangups[job_uuid]
except:
pass
def track_job(self, job_uuid):
self.jobs[job_uuid] = gevent.event.AsyncResult()
def untrack_job(self, job_uuid):
try:
del self.jobs[job_uuid]
except:
pass
def on_background_job(self, ev):
'''
Receives callbacks for BACKGROUND_JOB event.
'''
job_uuid = ev['Job-UUID']
job_cmd = ev['Job-Command']
job_arg = ev['Job-Command-Arg']
self.log.debug("%s %s, args %s => %s" % (job_uuid, job_cmd, job_arg, ev.get_body()))
try:
async_result = self.jobs[job_uuid]
async_result.set(ev)
except KeyError:
# job is not tracked
return
def wait_for_job(self, job_uuid):
'''
Waits until BACKGROUND_JOB event was caught and returns Event.
'''
try:
async_result = self.jobs[job_uuid]
return async_result.wait()
except KeyError:
# job is not tracked
return None
def on_channel_hangup(self, ev):
'''
Receives callbacks for BACKGROUND_JOB event.
'''
print ev
job_uuid = ev['Job-UUID']
job_cmd = ev['Job-Command']
job_arg = ev['Job-Command-Arg']
self.log.debug("%s %s, args %s => %s" % (job_uuid, job_cmd, job_arg, ev.get_body()))
try:
async_result = self.hangups[job_uuid]
async_result.set(ev)
except KeyError:
# job is not tracked
return
def wait_for_channel_hangup(self, job_uuid):
'''
Waits until HANGUP event was caught and returns Event.
'''
try:
async_result = self.hangups[job_uuid]
return async_result.wait()
except KeyError:
# job is not tracked
return None
def spawn_originate(inbound_event_listener, contact, log):
fs_bg_api_string = \
"originate %s &playback(/usr/local/freeswitch/sounds/en/us/callie/base256/8000/liberty.wav)" \
% contact
bg_api_response = inbound_event_listener.bgapi(fs_bg_api_string)
log.info(str(bg_api_response))
job_uuid = bg_api_response.get_job_uuid()
if not job_uuid:
log.error("bgapi %s: job uuid not found" % fs_bg_api_string)
return
inbound_event_listener.track_job(job_uuid)
inbound_event_listener.track_hangup(job_uuid)
log.info("bgapi %s => Job-UUID %s" % (fs_bg_api_string, job_uuid))
log.info("waiting job %s ..." % job_uuid)
ev = inbound_event_listener.wait_for_job(job_uuid)
log.info("-----------------------------")
log.info("waiting hangup %s ..." % job_uuid)
ev = inbound_event_listener.wait_for_channel_hangup(job_uuid)
log.info("bgapi %s => %s" % (fs_bg_api_string, str(ev.get_body())))
if __name__ == '__main__':
log = StdoutLogger()
# Retrieve the next person to call
campaign_handler = CampaignHandler()
campaign_handler.check_campaign()
campaign_handler.check_pending_subscriber(1)
next_caller = campaign_handler.get_pending_subscriber(1)
print next_caller
try:
inbound_event_listener = MyEventSocket('127.0.0.1', 8021, 'ClueCon', filter="ALL", log=log)
try:
inbound_event_listener.connect()
except ConnectError, e:
log.error("connect failed: %s" % str(e))
raise SystemExit('exit')
if not CONTACTS:
log.error("No CONTACTS !")
raise SystemExit('exit')
pool = gevent.pool.Pool(len(CONTACTS))
for contact in CONTACTS:
pool.spawn(spawn_originate, inbound_event_listener, contact, log)
pool.join()
log.debug("all originate commands done")
except (SystemExit, KeyboardInterrupt):
pass
log.info("exit")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment