Skip to content

Instantly share code, notes, and snippets.

@elcolumbio
Created September 1, 2017 19:51
Show Gist options
  • Save elcolumbio/dfbbc26e2585425e2f4d47879bc9c584 to your computer and use it in GitHub Desktop.
Save elcolumbio/dfbbc26e2585425e2f4d47879bc9c584 to your computer and use it in GitHub Desktop.
mws api python, flatten response dict
# maybe its helpful or you see improvements
# i get lots of latin-1 encoded text fields, so i use this try block
def flatten_dict(d):
def items():
for key, value in d.items():
if isinstance(value, dict):
for subkey, subvalue in flatten_dict(value).items():
try:
subvalue = subvalue.encode('latin-1').decode('utf-8')
except:
pass
yield key + "." + subkey, subvalue
elif isinstance(value, list):
for deep in value:
for xkey, xvalue in deep.items():
if isinstance(xvalue, dict):
for xsubkey, xsubvalue in flatten_dict(xvalue).items():
try:
xsubvalue = xsubvalue.encode('latin-1').decode('utf-8')
except:
pass
yield xkey + "." + xsubkey, xsubvalue
else:
try:
value.encode('latin-1').decode('utf-8')
except:
pass
yield key, value.encode('latin-1').decode('utf-8')
return dict(items())
#i call it like that
for item in resultlist:
todflist.append(flatten_dict(item))
@elcolumbio
Copy link
Author

this is not working. Since there are dictionaries in lists where you cant use key value pairs.
Because there are duplicate keys and values.

@elcolumbio
Copy link
Author

my new version is more explicit. It looks horrible:
`class ApiResponseParser():
# we parse the response and map keys to our mapping yaml file
# the flow is we fill a single dictionary through kombination 1-3
def init(self):
self.path_to_response = 'masterK.yaml'
self.path_to_mappings = '/home/flo/miniconda3/lib/python3.6/site-packages/mws/wrapper/finance_mapping.yaml'
self.response = dict()
self.mappings = dict()
self.multirow_resultlist = []
self.resultlist = []
self.main()

def main(self):
    self.readapiresponse()
    self.readmappings()
    for page in self.response:
        print(type(page))
        self.parse_eventart(page)

def readapiresponse(self):
    # actually i load a yaml file i dumped in my own api wrapper
    with open(self.path_to_response, 'r') as f:
        self.response = yaml.load(f)

def readmappings(self):
    # mappings you defined for each eventtype in the yaml to parse the api response
    with open(self.path_to_mappings, 'r') as f:
        self.mappings = yaml.load(f)

def parse_eventart(self, page):
    # see the big picture
    for key_eventart, value_eventart in page.items():
        if (value_eventart == {}) or (key_eventart == 'value'):
            pass
        else:
            self.eventart = key_eventart
            self.unnest = self.eventart.split('List')
            assert len(self.unnest) == 2 and self.unnest[1] == ''
            try:
                unlist_value_eventart = value_eventart[self.unnest[0]]
            except KeyError:
                unlist_value_eventart = value_eventart[list(value_eventart.keys())[0]]
            # save eventart to use it later in a column {'eventart': 'ProductAdsPaymentEvent'}
            self.kombination2(unlist_value_eventart)
                
def kombination2(self, value_eventart, reset=False):
    self.depth = 0
    if  isinstance(value_eventart, list):
    # list = more then one event item
        for transaction in value_eventart:
            if not reset:
                self.multirow_resultlist = []
                self.resultdict = dict()
                self.depth = 0
            self.resultdict['eventart'] = self.unnest[0]
            self.mainkombination2(transaction)
    else:  # if only one transaction for this event is recorded
        if not reset:
            self.resultdict = dict()
        self.resultdict['eventart'] = self.unnest[0]
        self.mainkombination2(value_eventart)
            
def mainkombination2(self, transaction):
    for key, value in transaction.items():
        exist_secondlevel = self.mappings[self.eventart.lower()].get('secondlevel_lists', [])
        if exist_secondlevel == []:
            secondlevel = False
        else:
            secondlevel = True
        
        # for each transaction we look if we want to save the values for the special key.
        if key.lower() in self.mappings[self.eventart.lower()]['toplevel_keys']:
            self.resultdict[key.lower()] = value['value']  # the value is nested too
            # example {'invoiceId': 'HSH0GZZ1Q-5','postedDate': '2018-01-02T21:22:43Z'}
        elif key.lower() in self.mappings[self.eventart.lower()].get('toplevel_lists', []):
            selected_mappings = self.mappings[self.eventart.lower()]['toplevel_lists'][key.lower()]
            self.iterate_list(value, key, selected_mappings)  # value = list and key is name of list
        elif secondlevel:
            if key.lower() in self.mappings[self.eventart.lower()].get('secondlevel_lists', []):
                # i unpack further nested lists. the names are often just missing list or adjustment
                new_values = value[key.split('List')[0].split('Adjustment')[0]]
                # we start recursion call, our code finds more toplevel_keys when secondlist is specified
                self.depth += 1
                assert self.depth == 1
                self.kombination2(new_values, reset=True)
            # now we have to handle the difficult or dangerous lists
            # by now we are in recursion because difficult lists have a secondlevel_list specified 
            elif key.lower() in list(list(exist_secondlevel.values())[0].keys()):
                actual_secondlevelkey = list(exist_secondlevel.keys())[0]
                selected_mappings = exist_secondlevel[actual_secondlevelkey][key.lower()]
                self.iterate_difficultlist(value, key, selected_mappings)
        else:
            print('this key i couldnt find {}, eventart: {}, depth: {}, values: {}'.format(key.lower(), self.eventart, self.depth, value))
    self.depth -= 1
    if self.depth == -2:
        for row in self.multirow_resultlist:
            masterdict = dict()
            masterdict = dict(self.resultdict)
            masterdict.update(row)
            self.resultlist.append(masterdict)
        self.multirow_resultlist == []
    if self.multirow_resultlist == []:
            self.resultlist.append(self.resultdict)
                
def iterate_list(self, listvalues, listname, selected_mappings):
    # value looks like this {'CurrencyAmount': {'value': '0.0'}, 'CurrencyCode': {'value': 'EUR'}, 'value': '\\n'}
    # only works if there is only one item e.g. one time currencyamount
    if 'list' in listname.lower():
        listvalues = listvalues[listname.split('List')[0]]
    for feature in listvalues:
        if feature.lower() in selected_mappings:
            if 'appendkey' in selected_mappings:
                nestedkey = listname.split('Value')[0].lower()+'_'+feature.lower()
            else:
                nestedkey = feature.lower()
            nestedvalues = listvalues[feature]['value']
            self.resultdict[nestedkey] = nestedvalues
            
def iterate_difficultlist(self, listvalues, listname, selected_mappings):
    # a difficultlist is a  list for each item we will create a row in our final table
    # its not a cartesian product its just 1xself.resultdict x multiple difficultlists
    # this is how amazon does it for the csv reports
    # at least true for shipmentitemlist
    # there can be multiple difficultlists for one transaction
    nestedlistname = listname.replace('Item', '').replace('List', '').replace('Adjustment', '')+'Component'
    nestedlistname2 = listname.replace('Item', '').replace('List', '').replace('Adjustment', '')+'Amount'
    listvalues = listvalues[nestedlistname]
    subevent = None
    if isinstance(listvalues, list):
        for item in listvalues:
            self.iterate_difficultlist2(item, nestedlistname2)
    elif isinstance(listvalues, dict):
        self.iterate_difficultlist2(listvalues, nestedlistname2)
        
def iterate_difficultlist2(self, item, nestedlistname2):
    unnesteditem = item[nestedlistname2]
    multirow_resultdict = dict()
    for key, values in item.items():
        if isinstance(key, str):
            if 'Type' in key:
                subevent = item.get(key, [])['value']
    assert subevent
    multirow_resultdict['subeventart'] = subevent
    for key, value in unnesteditem.items():
        if isinstance(value, dict):
            featurekey = key.lower()
            featurevalue = value['value']
            multirow_resultdict[featurekey] = featurevalue
    self.multirow_resultlist.extend([multirow_resultdict])`

@elcolumbio
Copy link
Author

my yaml file looks like that (some reports i have to test):
refundeventlist :
toplevel_keys : [amazonorderid, marketplacename, posteddate, quantityshipped, sellerorderid, sellersku]
secondlevel_lists :
shipmentitemadjustmentlist:
itemchargeadjustmentlist : [chargetype, currencyamount, currencycode]
itemfeeadjustmentlist : [feetype, currencyamount, currencycode]

shipmenteventlist :
toplevel_keys : [amazonorderid, marketplacename, posteddate, quantityshipped, sellersku]
secondlevel_lists :
shipmentitemlist :
itemchargelist : [chargetype, currencyamount, currencycode]
itemfeelist : [feetype, currencyamount, currencycode]

chargebackeventlist :
toplevel_keys : [amazonorderid, marketplacename, posteddate, quantityshipped, sellersku]
secondlevel_lists :
shipmentitemadjustmentlist:
itemchargeadjustmentlist : [chargetype, currencyamount, currencycode]
itemfeeadjustmentlist : [feetype, currencyamount, currencycode]

servicefeeeventlist :
toplevel_keys : []
secondlevel_lists :
dummy:
feelist : [feetype, currencyamount, currencycode]

adjustmenteventlist :
toplevel_keys : [adjustmenttype, posteddate]
toplevel_lists :
adjustmentamount : [currencyamount, currencycode]
adjustmentitemlist : [quantity, sellersku]

productadspaymenteventlist :
toplevel_keys : [invoiceid, posteddate, transactiontype]
toplevel_lists :
taxvalue : [appendkey, currencycode, currencyamount]
transactionvalue : [appendkey, currencycode, currencyamount]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment