Skip to content

Instantly share code, notes, and snippets.

@arsperger
Created June 14, 2018 11:07
Show Gist options
  • Save arsperger/e11d834b420e632a7d99d640cef7b726 to your computer and use it in GitHub Desktop.
Save arsperger/e11d834b420e632a7d99d640cef7b726 to your computer and use it in GitHub Desktop.
Python script for various test of CGRateS engine.
# jsonclient.py
# A simple JSONRPC client library, created to work with Go servers
# Written by Stephen Day
# Modified by Bruce Eckel to work with both Python 2 & 3
import json, socket, itertools, time
from datetime import datetime
class JSONClient(object):
def __init__(self, addr, codec=json):
self._socket = socket.create_connection(addr)
self._id_iter = itertools.count()
self._codec = codec
def _message(self, name, *params):
return dict(id=next(self._id_iter),
params=list(params),
method=name)
def call(self, name, *params):
request = self._message(name, *params)
id = request.get('id')
msg = self._codec.dumps(request)
self._socket.sendall(msg.encode())
# This will actually have to loop if resp is bigger
response = self._socket.recv(4096)
response = self._codec.loads(response.decode())
if response.get('id') != id:
raise Exception("expected id=%s, received id=%s: %s"
%(id, response.get('id'),
response.get('error')))
if response.get('error') is not None:
raise Exception(response.get('error'))
return response.get('result')
def close(self):
self._socket.close()
rpc =JSONClient(("172.19.0.14", 2012))
testlcr = {
"IgnoreErrors": True,
"MaxCost":"*event_cost",
"Tenant": "cgrates.org",
"ID": "SPL_LEASTCOST_TEST",
"Event": {
"Account": "5a72e6e5c3bb20002376b8a7",
"Destination": "XXXXXXYYYYY",
"Subject": "NONEU",
"Category": "Standard",
"Time": "2018-03-05T13:00:00Z",
"SetupTime": "2018-03-13T13:00:00Z",
#"AnswerTime": "0001-01-01T00:00:00Z",
"Cost": "-1",
"Usage": "60000000000",
#"Usage": "60s",
},
}
# alternative to the above
s = socket.create_connection(("172.19.0.14", 2012))
s.sendall(json.dumps({"id": 1, "method": "SupplierSv1.GetSuppliers", "params": [testlcr]}))
print(s.recv(4096))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment