Created
June 14, 2018 11:07
-
-
Save arsperger/e11d834b420e632a7d99d640cef7b726 to your computer and use it in GitHub Desktop.
Python script for various test of CGRateS engine.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# jsonclient.py | |
# A simple JSONRPC client library, created to work with Go servers | |
# Written by Stephen Day | |
# Modified by Bruce Eckel to work with both Python 2 & 3 | |
import json, socket, itertools, time | |
from datetime import datetime | |
class JSONClient(object): | |
def __init__(self, addr, codec=json): | |
self._socket = socket.create_connection(addr) | |
self._id_iter = itertools.count() | |
self._codec = codec | |
def _message(self, name, *params): | |
return dict(id=next(self._id_iter), | |
params=list(params), | |
method=name) | |
def call(self, name, *params): | |
request = self._message(name, *params) | |
id = request.get('id') | |
msg = self._codec.dumps(request) | |
self._socket.sendall(msg.encode()) | |
# This will actually have to loop if resp is bigger | |
response = self._socket.recv(4096) | |
response = self._codec.loads(response.decode()) | |
if response.get('id') != id: | |
raise Exception("expected id=%s, received id=%s: %s" | |
%(id, response.get('id'), | |
response.get('error'))) | |
if response.get('error') is not None: | |
raise Exception(response.get('error')) | |
return response.get('result') | |
def close(self): | |
self._socket.close() | |
rpc =JSONClient(("172.19.0.14", 2012)) | |
testlcr = { | |
"IgnoreErrors": True, | |
"MaxCost":"*event_cost", | |
"Tenant": "cgrates.org", | |
"ID": "SPL_LEASTCOST_TEST", | |
"Event": { | |
"Account": "5a72e6e5c3bb20002376b8a7", | |
"Destination": "XXXXXXYYYYY", | |
"Subject": "NONEU", | |
"Category": "Standard", | |
"Time": "2018-03-05T13:00:00Z", | |
"SetupTime": "2018-03-13T13:00:00Z", | |
#"AnswerTime": "0001-01-01T00:00:00Z", | |
"Cost": "-1", | |
"Usage": "60000000000", | |
#"Usage": "60s", | |
}, | |
} | |
# alternative to the above | |
s = socket.create_connection(("172.19.0.14", 2012)) | |
s.sendall(json.dumps({"id": 1, "method": "SupplierSv1.GetSuppliers", "params": [testlcr]})) | |
print(s.recv(4096)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment