Skip to content

Instantly share code, notes, and snippets.

@tantale
Created June 12, 2018 15:07
Show Gist options
  • Save tantale/2ed1592d1dc65fed19726ddf9936d670 to your computer and use it in GitHub Desktop.
Save tantale/2ed1592d1dc65fed19726ddf9936d670 to your computer and use it in GitHub Desktop.
# coding: utf-8
"""
Qronos Attributes encoder/decoder
=================================
:Created on: 2018-06-12
:Author: Laurent LAPORTE <llaporte@jouve.com>
"""
import datetime
import enum
import re
import six
from six.moves.urllib.parse import quote_plus
from six.moves.urllib.parse import unquote_plus
#: "nil" is a special value used by Qronos to represent empty or missing attributes.
NIL = u"nil"
#: "ARRAY#|#" is a marker used to represent a PHP array.
#: In Python, we used it for ``list``, ``tuple`` and ``dict`` objects.
ARRAY = u"ARRAY#|#"
#: Standard attributes which doesn't need to be URL-encoded.
STANDARD_ATTRS = ['APP', 'ATTRIBUT', 'DATE', 'DATEL', 'DE', 'DEBUG', 'DELHISTORY', 'DOC', 'ETAT', 'FIC',
'FICHIER', 'ID', 'IMA', 'M', 'META', 'MODE', 'NBJ', 'NBP', 'NOM', 'OPE', 'OPEGEO', 'PRI',
'PROCESS', 'REP', 'REQ', 'SER', 'SORTIE', 'SYNC', 'TAILLE', 'TYPE', 'USER', 'VERS']
class State(enum.Enum):
PENDING = u'ATTENTE'
IN_PROGRESS = u'EN_COURS'
SUBCONTRACTED = u'SOUS_TRAITE'
DELEGATED = u'DELEGUE'
FAILURE = u'ECHEC'
FINISHED = u'FINI'
def parse_attrs(attrs):
"""
Parse the attributes extracted from a Qronos response message.
:type attrs: str
:param attrs: String representing an attribute.
:rtype: dict[str, str]
:return: Parsed attributes.
"""
if attrs is None:
return {}
att = re.compile(r'(\w+)=("[^"]*"|[^ ]*)', re.S | re.I)
d_att = {}
for i in att.finditer(attrs):
d_att[i.group(1)] = i.group(2).strip('"')
return d_att
class QronosEncoder(object):
encoding = "CP1252"
def encode(self, obj):
"""
Encode a Python data structure into a string which Qronos can handle.
"""
if obj is None:
return NIL
elif isinstance(obj, bool):
return six.text_type(obj)
elif isinstance(obj, six.text_type):
return obj if obj else NIL
elif isinstance(obj, six.binary_type):
return obj.decode(self.encoding) if obj else NIL
elif isinstance(obj, (six.integer_types, float)):
return six.text_type(obj)
elif isinstance(obj, (list, tuple)):
# support of PHP arrays in Igloo
return ARRAY + u"#|#".join(u"{i}>##>{v}".format(i=i, v=v) for i, v in enumerate(obj))
elif isinstance(obj, dict):
# support of PHP arrays in Igloo
return ARRAY + u"#|#".join(u"{k}>##>{v}".format(k=k, v=v) for k, v in six.iteritems(obj))
elif isinstance(obj, datetime.datetime):
return six.text_type(obj.strftime("%d/%m/%Y %H:%M:%S"))
elif isinstance(obj, datetime.date):
# WARNING: ``datetime.datetime`` is a subclass of ``datetime.date``
# so, we must have this ``elif`` after the one for ``datetime.datetime``
return six.text_type(obj.strftime("%d/%m/%Y"))
else:
raise TypeError("cannot encode object of type: " + repr(obj))
def encode_compact_date(self, obj):
if isinstance(obj, datetime.datetime):
return six.text_type(obj.strftime("%d%m%Y%H%M%S"))
elif isinstance(obj, datetime.date):
return six.text_type(obj.strftime("%d%m%Y"))
else:
return self.encode(obj)
def encode_french_date(self, obj):
if isinstance(obj, datetime.datetime):
return six.text_type(obj.strftime("%d/%m/%Y %H:%M:%S"))
elif isinstance(obj, datetime.date):
return six.text_type(obj.strftime("%d/%m/%Y"))
else:
return self.encode(obj)
def encode_oui_non(self, obj):
if obj in (True, False):
return {True: u"O", False: u"N"}[obj]
else:
return self.encode(obj)
def encode_state(self, obj):
if isinstance(obj, State):
return obj.value
else:
return self.encode(obj)
def encode_int(self, obj):
if isinstance(obj, six.integer_types):
return six.text_type(obj)
else:
return self.encode(obj)
encoders = {
'DATE': encode_compact_date,
'DATEL': encode_french_date,
'DELHISTORY': encode_oui_non,
'ETAT': encode_state,
'NBJ': encode_int,
'NBP': encode_int,
'PRI': encode_int,
'TAILLE': encode_int,
}
def encode_attrs(self, attrs):
"""
Encode the attributes to send to Qronos.
:type attrs: dict[str, object]
:param attrs: Qronos attributes.
:return: the dictionary of encoded attributes. Values are byte strings.
:rtype: dict[str, bytes]
"""
ret = {}
encoders = self.encoders
for key, obj in six.iteritems(attrs):
key = key.upper()
if key in encoders:
ret[key] = encoders[key](self, obj)
else:
encoded = self.encode(obj)
if key in STANDARD_ATTRS:
ret[key] = encoded
else:
ret[key] = quote_plus(encoded, encoding=self.encoding)
return ret
# noinspection PyMethodMayBeStatic
def decode(self, text):
if text is None or text == NIL:
return None
elif text.startswith(ARRAY):
text = text[len(ARRAY):]
pairs = [kv.split(u'>##>', 1) for kv in text.split(u"#|#")]
if [pair[0] for pair in pairs] == [six.text_type(i) for i in six.moves.xrange(len(pairs))]:
# The value is an array with numeric indexes (translated into a Python list)
return [pair[1] for pair in pairs]
else:
# The value is an array with string indexes (translated into a Python dict)
return dict(pairs)
elif isinstance(text, six.text_type):
return text
else:
raise TypeError("cannot decode object of type: " + repr(text))
def decode_date(self, text):
date_formats = ["%d/%m/%Y", "%Y-%m-%d", "%d%m%Y"]
for fmt in date_formats:
try:
return datetime.datetime.strptime(text, fmt).date()
except ValueError:
pass
datetime_formats = ["%d/%m/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%d%m%Y%H%M%S"]
for fmt in datetime_formats:
try:
return datetime.datetime.strptime(text, fmt)
except ValueError:
pass
return self.decode(text)
def decode_oui_non(self, text):
if text in u"ON":
return {u"O": True, u"N": False}[text]
else:
return self.decode(text)
def decode_state(self, text):
for state in State:
if state.value == text:
return state
return self.decode(text)
def decode_int(self, text):
try:
return int(text)
except ValueError:
return self.decode(text)
decoders = {
'DATE': decode_date,
'DATEL': decode_date,
'DELHISTORY': decode_oui_non,
'ETAT': decode_state,
'NBJ': decode_int,
'NBP': decode_int,
'PRI': decode_int,
'TAILLE': decode_int,
}
def decode_attrs(self, attrs):
"""
Decode the attributes received from Qronos.
:type attrs: dict[str, bytes]
:param attrs: attributes parsed from a Qronos response.
:rtype: dict[str, object]
:return: Python dictionary containing the decoded attributes.
"""
ret = {}
decoders = self.decoders
for key, value in six.iteritems(attrs):
key = key.upper()
if isinstance(value, six.binary_type):
value = value.decode()
decoded = unquote_plus(value, encoding=self.encoding)
if key in decoders:
ret[key] = decoders[key](self, decoded)
elif key == "VAL":
val_attrs = parse_attrs(decoded)
ret.update({k: self.decode(t) for k, t in six.iteritems(val_attrs)})
else:
ret[key] = self.decode(decoded)
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment