Skip to content

Instantly share code, notes, and snippets.

@aVolpe
Last active July 28, 2022 21:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aVolpe/afdbc575b7a76270afca494afe44c384 to your computer and use it in GitHub Desktop.
Save aVolpe/afdbc575b7a76270afca494afe44c384 to your computer and use it in GitHub Desktop.
jpos utils
#!/usr/bin/python
import sys
import xml.etree.cElementTree as ET
import string
import random
import socket
from datetime import datetime
if len(sys.argv) < 2:
print("""
A script that sends xml messages similar to the jpos XMLPackager.
USAGE:
python send_message.py IP:PORT field_id:field_value ...
WHERE
IP:PORT is the ip/port of the jpos cmf port
...FIELD_ID:FIELD_VALUE: are the rest of params, each with the format field_id:field_value
if field_value starts with a _ is considered a binary in hex format
if field_value starts with a __ is considered a ISOAmount, and it will be splited by '-'
to separate between currency and amount
Some fields are automatically populated with random values like the STAN, RRN
Some fields are automatically populated with the current date, like the 7 and 12
USAGE WITH TEMPLATES:
This script also supports base messages that you can load as a 'template', to use this feature,
create a file with the extension '.xml', and pass as the third parameter:
python send_message.py IP:PORT template.xml field_id:field_value ...
This will change the rrn/stan/date fields, if you want to keep those fields, pass a four parameter
with the value 'keep':
python send_message.py IP:PORT template.xml keep field_id:field_value ...
""")
exit(0)
def sortchildrenby(parent, attr):
"""
Sort children, copied from https://stackoverflow.com/a/25339725/1126380
"""
parent[:] = sorted(parent, key=lambda child: int(child.get(attr)))
def _pretty_print(current, parent=None, index=-1, depth=0):
"""
Makes a compact a pretty one, with line breaks and ident
Copied from: https://stackoverflow.com/a/65808327/1126380
"""
for i, node in enumerate(current):
_pretty_print(node, current, i, depth + 1)
if parent is not None:
if index == 0:
parent.text = '\n' + ('\t' * depth)
else:
parent[index - 1].tail = '\n' + ('\t' * depth)
if index == len(parent) - 1:
current.tail = '\n' + ('\t' * (depth - 1))
def _get_or_add_sub_iso(root, field_nro):
path = "isomsg[@id='" + field_nro + "']"
found = root.find(path)
if found is None:
sub_path = ET.SubElement(root, 'isomsg')
sub_path.set('id', field_nro)
return sub_path
return found
def _add_field(root, field_nro, field_value):
"""
Add a 'field' to an 'isomsg'
"""
parts = field_nro.split('.', 2)
if len(parts) > 1:
curr_root = root
curr_root = _get_or_add_sub_iso(root, parts[0])
_add_field(curr_root, parts[1], field_value)
return
path = "field[@id='" + field_nro + "']"
found = root.find(path)
if found is not None:
root.remove(found)
field = ET.SubElement(root, "field")
field.set('id', field_nro)
if field_value[0:2] == '__':
""" Amount """
field.set('type', 'amount')
field.set('currency', field_value.split('-')[0][2:])
field.set('value', field_value.split('-')[1].replace('_', '.'))
elif field_value[0] == '_':
""" Binary """
field.set('value', str(field_value[1:]))
field.set('type', 'binary')
else:
""" normal """
field.set('value', str(field_value))
def _add_param(root, param):
"""
Parses a param in the form of field:value
"""
parts = str(param).split(':', 2)
_add_field(root, parts[0], parts[1])
def _generate_number(length):
"""
Return a random number of #length
"""
letters = string.digits
return ''.join(random.choice(letters) for i in range(length))
def _send_and_receive(ip_port, message):
"""
Send a message, and then wait infinetly for a response
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
ip = ip_port.split(':')[0]
port = ip_port.split(':')[1]
s.connect((ip, int(port)))
s.sendall(bytes(message))
s.sendall('\n') # send end of message
print("Message send, waiting for reply")
data = s.recv(65000)
print("Message received:")
print(data)
except Exception as err:
s.close()
raise err
def _add_date_fields(iso):
"""
Adds the date fields, 7 and 12
"""
now = datetime.now()
_add_field(iso, '7', now.strftime('%m%d%H%M%S'))
_add_field(iso, '12', now.strftime('%Y%m%d%H%M%S'))
if __name__ == "__main__":
first_index = 2
param_2 = sys.argv[2]
if param_2 is not None and param_2.endswith('.xml'):
print("Loading template " + param_2)
first_index = 3
iso = ET.parse(param_2).getroot()
should_keep = False if len(sys.argv) > 3 and sys.argv[3] == 'keep' else True
if should_keep:
print("Replacing date and stan/rrn fields")
_add_field(iso, '11', _generate_number(12))
_add_field(iso, '37', _generate_number(12))
_add_date_fields(iso)
else:
print("Keeping all fields from template")
first_index = 4
else:
iso = ET.Element("isomsg")
_add_field(iso, '0', '2100')
_add_field(iso, '11', _generate_number(12))
_add_field(iso, '37', _generate_number(12))
_add_field(iso, '41', _generate_number(16))
_add_field(iso, '22', '_10000010020000000100000000000000')
_add_field(iso, '113.2', '106')
_add_date_fields(iso)
print('Replacements: ' + str(sys.argv[first_index:]))
for x in sys.argv[first_index:]:
_add_param(iso, x)
ip_port = sys.argv[1]
sortchildrenby(iso, 'id')
_pretty_print(iso)
print("Sending message: ")
print(ET.tostring(iso))
print("To: " + ip_port)
_send_and_receive(ip_port, ET.tostring(iso))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment