Skip to content

Instantly share code, notes, and snippets.

@taf2
Last active November 9, 2021 02:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taf2/4bd56fdc865c344e5457b6353ecb8e5b to your computer and use it in GitHub Desktop.
Save taf2/4bd56fdc865c344e5457b6353ecb8e5b to your computer and use it in GitHub Desktop.
import os
import sys
import re
import websocket
import http.client as httplib
import base64
import json
import urllib
import requests
from requests.auth import HTTPBasicAuth
CTM_TOKEN = os.environ['CTM_TOKEN'] # API key from account to connect a(account_id)d.....
CTM_PASS = os.environ['CTM_SECRET'] # API sec from the account to connect
CTM_HOST = os.environ['CTM_HOST'] # in production app.calltrackingmetrics.com
CTM_SOCKS = os.environ['CTM_SOCKS'] # in production app.calltrackingmetrics.com
CTM_ACCOUNT_ID = re.search('a(\d+)d', CTM_TOKEN).group(1) # parse the account id from the api key
CTM_USER_ID = None
CTM_CAPTOKEN = None
def on_message(wsapp, message):
print(message)
if message == '42["access.handshake"]':
output = json.dumps(["access.account", {"id": CTM_USER_ID, "account": CTM_ACCOUNT_ID, "captoken": CTM_CAPTOKEN}])
r = wsapp.send("42" + output)
print("sent access")
def on_close(wsapp, close_status_code, close_msg):
# Because on_close was triggered, we know the opcode = 8
print("on_close args:")
if close_status_code or close_msg:
print("close status code: " + str(close_status_code))
print("close message: " + str(close_msg))
# TODO reconnet if needed
def on_open(wsapp):
print("on_open")
wsapp.ping()
def on_ping(wsapp, message):
print("Got a ping! A pong reply has already been automatically sent.")
def on_pong(wsapp, message):
print("Got a pong! No need to respond")
def start_sockets():
socket_url = "wss://" + CTM_SOCKS + "/socket.io/?EIO=3&transport=websocket"
print("connecting to: ", socket_url)
wsapp = websocket.WebSocketApp(socket_url, on_open=on_open, on_close=on_close, on_message=on_message, on_ping=on_ping, on_pong=on_pong)
# output = json.dumps(["calls.active", { "account": CTM_ACCOUNT_ID }])
wsapp.run_forever(ping_interval=10, ping_timeout=5, ping_payload="3probe")
def get_user_id():
res = requests.get("https://" + CTM_HOST + "/api/v1/accounts/" + CTM_ACCOUNT_ID + "/users", auth=HTTPBasicAuth(CTM_TOKEN, CTM_PASS))
if res.status_code == 200:
json = res.json()
print(json["users"][0])
return json["users"][0]["uid"]
def get_captoken():
res = requests.post("https://" + CTM_HOST + "/api/v1/accounts/" + CTM_ACCOUNT_ID + "/users/captoken", auth=HTTPBasicAuth(CTM_TOKEN, CTM_PASS))
if res.status_code == 200:
txt = res.text
json = eval(txt)
return json['token']
CTM_USER_ID = get_user_id()
if CTM_USER_ID:
print("got user_id: '%d'" % CTM_USER_ID)
CTM_CAPTOKEN = get_captoken()
if CTM_CAPTOKEN:
print("got captoken: '%s'" % CTM_CAPTOKEN)
if CTM_CAPTOKEN and CTM_USER_ID:
start_sockets()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment