Skip to content

Instantly share code, notes, and snippets.

@wking
Last active September 18, 2015 22:45
Show Gist options
  • Save wking/cf87063bd5f76be73db0 to your computer and use it in GitHub Desktop.
Save wking/cf87063bd5f76be73db0 to your computer and use it in GitHub Desktop.
Upload product ↔ category associations
#!/usr/bin/env python3
#
# https://gist.github.com/wking/cf87063bd5f76be73db0
import csv
import json
import logging
import sys
import time
import urllib.request
logging.basicConfig(level=logging.INFO)
def read_categories(stream=sys.stdin):
categories = {'children': {}}
logging.info('loading categories')
for row in csv.DictReader(stream):
parent = categories
for field in ['Department', 'Aisle', 'Section', 'Shelf', 'Space']:
category = {
'name': row[field],
'short-name': row['{} URL'.format(field)],
'children': {},
}
if (category['name'] in ['', '*'] or
category['short-name'] in ['', '*']):
break
if category['name'] not in parent['children']:
parent['children'][category['name']] = category
parent = parent['children'][category['name']]
return categories
def upload_categories(base_url, headers, categories, parent_id=None):
logging.info('uploading categories')
if categories['children']:
current_categories = download_categories(
base_url=base_url, headers=headers, parent_id=parent_id)
for name, category in categories['children'].items():
category['parent'] = parent_id
if category['name'] not in current_categories:
category_id = upload_category(
base_url=base_url, headers=headers, category=category)
else:
category_id = current_categories[category['name']]
upload_categories(
base_url=base_url, headers=headers, categories=category,
parent_id=category_id)
def download_categories(base_url, headers, parent_id=None):
logging.info('download categories with parent {}'.format(parent_id))
if parent_id is None:
pid = 'null'
else:
pid = parent_id
request = urllib.request.Request(
url='{base}/categories?parent={pid}&limit=250'.format(
base=base_url, pid=pid),
headers=headers,
method='GET',
)
with urllib.request.urlopen(request) as response:
categories_bytes = response.read()
charset = response.headers.get_content_charset()
categories_json = categories_bytes.decode(charset)
categories = json.loads(categories_json)
name_ids = {cat['name']: cat['id'] for cat in categories}
logging.info('downloaded categories with parent {}: {}'.format(
parent_id, name_ids))
time.sleep(1)
return name_ids
def upload_category(base_url, headers, category):
data = category.copy()
data.pop('children')
logging.info('upload category {}'.format(data))
request = urllib.request.Request(
url='{base}/categories'.format(base=base_url),
data=json.dumps(data).encode('UTF-8'),
headers=headers,
method='POST',
)
with urllib.request.urlopen(request) as response:
new_category_bytes = response.read()
charset = response.headers.get_content_charset()
new_category_json = new_category_bytes.decode(charset)
new_category = json.loads(new_category_json)
logging.info('uploaded category {} with id {}'.format(
new_category['name'], new_category['id']))
time.sleep(1)
return new_category['id']
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('categories')
parser.add_argument('--base-url', default='https://api.azurestandard.com')
parser.add_argument(
'-H', '--header', action='append', default=[], dest='headers')
args = parser.parse_args()
headers = {}
for header in args.headers:
key, value = [x.strip() for x in header.split(':', 1)]
headers[key] = value
with open(args.categories, 'r') as stream:
categories = read_categories(stream=stream)
upload_categories(
base_url=args.base_url, headers=headers, categories=categories)
#!/usr/bin/env python3
#
# https://gist.github.com/wking/cf87063bd5f76be73db0
import csv
import json
import logging
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
logging.basicConfig(level=logging.INFO)
def get_categories(base_url, cache='/tmp/categories.json'):
try:
with open(cache, 'r') as stream:
categories_json = stream.read()
except FileNotFoundError:
logging.info('requesting categories')
categories = []
count = None
start = 0
limit = 250
while True:
logging.info('requesting categories ({} from {} of {})'.format(
limit, start, count))
with urllib.request.urlopen(
'{}/categories?{}'.format(
base_url,
urllib.parse.urlencode({
'start': start,
'limit': limit,
}))
) as response:
categories_bytes = response.read()
charset = response.headers.get_content_charset()
if count is None:
count = int(response.headers['Count'])
new_json = categories_bytes.decode(charset)
categories.extend(json.loads(new_json))
if len(categories) >= count:
break
start += limit
categories_json = json.dumps(categories)
with open(cache, 'w') as stream:
stream.write(categories_json)
categories = {}
cats = json.loads(categories_json)
for category in cats:
key = (category['name'], category.get('parent'))
categories[key] = category['id']
logging.debug('category {} -> {}'.format(key, category['id']))
return categories
def get_subheaders(base_url, headers, categories, stream=sys.stdin):
subheaders = {}
logging.info('loading subheaders')
for row in csv.DictReader(stream):
subheader = row['SubHeaders'].strip().lower()
if not subheader:
continue
parent_id = None
for field in ['Department', 'Aisle', 'Section', 'Shelf', 'Space']:
name = row[field]
if name in ['', '*']:
break
short_name = row['{} URL'.format(field)]
key = (name, parent_id)
try:
id = categories[key]
except KeyError as error:
logging.info('missing category {} ({})'.format(key, row))
try:
id = upload_category(
base_url=base_url, headers=headers, category={
'name': name,
'short-name': name,
'parent': parent_id,
})
except urllib.error.HTTPError:
with urllib.request.urlopen(
'{}/categories?{}'.format(
base_url,
urllib.parse.urlencode({'parent': parent_id}))
) as response:
categories_bytes = response.read()
charset = response.headers.get_content_charset()
categories_json = categories_bytes.decode(charset)
cats = json.loads(categories_json)
category = [cat for cat in cats if cat['name'] == name][0]
id = category['id']
categories[key] = id
parent_id = id
if subheader not in subheaders:
subheaders[subheader] = []
subheaders[subheader].append(id)
logging.debug('subheaders {} -> {}'.format(
subheader, subheaders[subheader]))
return subheaders
def upload_category(base_url, headers, category):
logging.info('upload category {}'.format(category))
request = urllib.request.Request(
url='{base}/categories'.format(base=base_url),
data=json.dumps(category).encode('UTF-8'),
headers=headers,
method='POST',
)
with urllib.request.urlopen(request) as response:
new_category_bytes = response.read()
charset = response.headers.get_content_charset()
new_category_json = new_category_bytes.decode(charset)
new_category = json.loads(new_category_json)
logging.info('uploaded category {} with id {}'.format(
new_category['name'], new_category['id']))
time.sleep(1)
return new_category['id']
def associate_products(base_url, headers, subheaders, stream=sys.stdin):
logging.info('associating products')
missing = set()
for row in csv.DictReader(stream):
subheader = row['SubHeaders'].strip().lower()
if subheader in [
'soon to be discontinued',
]:
continue
code = row['Item code'].strip()
try:
category_ids = subheaders[subheader]
except KeyError as error:
if subheader not in missing:
logging.error(str(error))
missing.add(subheader)
continue
for category_id in category_ids:
associate_product(
base_url=base_url, headers=headers,
code=code, category_id=category_id)
def associate_product(base_url, headers, code, category_id):
logging.info('associate {} with {}'.format(code, category_id))
request = urllib.request.Request(
url='{base}/packaged-product/{code}/category/{id}'.format(
base=base_url, code=code, id=category_id),
headers=headers,
method='POST',
)
with urllib.request.urlopen(request) as response:
logging.info('associated {} with {}'.format(code, category_id))
time.sleep(1)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('categories')
parser.add_argument('products')
parser.add_argument('--base-url', default='https://api.azurestandard.com')
parser.add_argument(
'-H', '--header', action='append', default=[], dest='headers')
args = parser.parse_args()
headers = {}
for header in args.headers:
key, value = [x.strip() for x in header.split(':', 1)]
headers[key] = value
categories = get_categories(base_url=args.base_url)
with open(args.categories, 'r') as stream:
subheaders = get_subheaders(
base_url=args.base_url, headers=headers,
categories=categories, stream=stream)
with open(args.products, 'r') as stream:
associate_products(
base_url=args.base_url, headers=headers,
subheaders=subheaders, stream=stream)
@wking
Copy link
Author

wking commented Feb 25, 2015

For the live upload, I dropped fix_subheaders. I converted the Excel files to CSV and ran:

$ ./category-upload.py -H 'Authorization: Basic ....' Category\ UI\ and\ URL\ FINAL.csv
$ ./product-category-upload.py -H 'Authorization: Basic ...' Category\ UI\ and\ URL\ FINAL.csv Catalog\ Data\ Source\ File\ FINAL.csv

The existing-categories download in product-category-upload.py doesn't work with the new limited queries from azurestandard/beehive@2728652 (Merge branch 'api-limit', 2015-02-20), but I worked around that by seeding /tmp/categories.json with data dumped from a Django shell. Now that we support start (azurestandard/api-spec@86d4d29, public.json: Add 'start=...' for offsetting list results, 2015-02-24), you could go that way too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment