Skip to content

Instantly share code, notes, and snippets.

@agran
Last active October 4, 2022 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save agran/a60e43a49846c2dd0ff8b2a058f0ff3a to your computer and use it in GitHub Desktop.
Save agran/a60e43a49846c2dd0ff8b2a058f0ff3a to your computer and use it in GitHub Desktop.
call to sip and play wav with opus codec with pjsua with python
# call to sip and play wav with opus codec with pjsua with python
# first part: https://gist.github.com/agran/52ed833b0f30ca4531c4ceb8de234b0a
# python sip_call_and_play.py 'sip:10.33.201.1' 'Le_Spirit1.wav' 'Le_Spirit2.wav'
import sys
import pjsua
import threading
import wave
from time import sleep
def dump(obj):
for attr in dir(obj):
print("obj.%s = %r" % (attr, getattr(obj, attr)))
def log_cb(level, str, len):
print str,
class MyAccountCallback(pjsua.AccountCallback):
sem = None
def __init__(self, account=None):
pjsua.AccountCallback.__init__(self, account)
def wait(self):
self.sem = threading.Semaphore(0)
self.sem.acquire()
def on_reg_state(self):
if self.sem:
if self.account.info().reg_status >= 200:
self.sem.release()
def cb_func(pid) :
print '%s playback is done' % pid
current_call.hangup()
# Callback to receive events from Call
class MyCallCallback(pjsua.CallCallback):
def __init__(self, call=None):
pjsua.CallCallback.__init__(self, call)
# Notification when call state has changed
def on_state(self):
global current_call
global in_call
print "Call with", self.call.info().remote_uri,
print "is", self.call.info().state_text,
print "last code =", self.call.info().last_code,
print "(" + self.call.info().last_reason + ")"
#dump(self.call.info())
#print json.dumps(self.call.info())
if self.call.info().state == pjsua.CallState.DISCONNECTED:
current_call = None
print 'Current call is', current_call
in_call = False
elif self.call.info().state == pjsua.CallState.CONFIRMED:
#Call is Answred
print "Call Answred"
#fName = "Le_Spirit1.wav"
for i in range(2,len(sys.argv)):
fName = sys.argv[i]
wfile = wave.open(fName)
time = (1.0 * wfile.getnframes ()) / wfile.getframerate ()
print fName + " " + str(time) + "ms"
wfile.close()
call_slot = self.call.info().conf_slot
self.wav_player_id=pjsua.Lib.instance().create_player(fName,loop=False)
self.wav_slot=pjsua.Lib.instance().player_get_slot(self.wav_player_id)
pjsua.Lib.instance().conf_connect(self.wav_slot, call_slot)
sleep(time)
pjsua.Lib.instance().player_destroy(self.wav_player_id)
self.call.hangup()
in_call = False
# Notification when call's media state has changed.
def on_media_state(self):
if self.call.info().media_state == pjsua.MediaState.ACTIVE:
print "Media is now active"
else:
print "Media is inactive"
# Function to make call
def make_call(uri):
try:
print "Making call to", uri
return acc.make_call(uri, cb=MyCallCallback())
except pjsua.Error, e:
print "Exception: " + str(e)
return None
lib = pjsua.Lib()
try:
lib.init(log_cfg = pjsua.LogConfig(level=4, callback=log_cb))
lib.create_transport(pjsua.TransportType.UDP, pjsua.TransportConfig(5080))
#print '\n'.join(sys.path)
#dump(lib.codecEnum())
lib.set_null_snd_dev()
lib.start()
lib.handle_events()
print
lib.set_codec_priority("opus/48000", 132)
infos = lib.enum_codecs()
for c in infos:
print c.name + " "+ str(c.channel_count) + " " + str(c.avg_bps) + " = " + str(c.priority)
# Create UDP transport which listens to any available port
transport = lib.create_transport(pjsua.TransportType.UDP)
# Create local/user-less account
acc = lib.create_account_for_transport(transport)
#dump(acc.codecEnum())
acc_cb = MyAccountCallback()
# Make call
#call = acc.make_call(sys.argv[1], acc_cb)
#acc_cb.wait()
print "\n"
print "Registration complete, status=", acc.info().reg_status, \
"(" + acc.info().reg_reason + ")"
#YOURDESTINATION is landline or mobile number you want to call
#dst_uri="sip:10.33.201.1"
dst_uri=sys.argv[1]
in_call = True
lck = lib.auto_lock()
current_call = make_call(dst_uri)
print 'Current call is', current_call
del lck
#wait for the call to end before shuting down
while in_call:
pass
#sys.stdin.readline()
lib.destroy()
lib = None
except pjsua.Error, e:
print "Exception: " + str(e)
lib.destroy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment