Skip to content

Instantly share code, notes, and snippets.

@ksdme
Last active December 29, 2020 09:08
Show Gist options
  • Save ksdme/cce86ba6a517c1c78e2f79ce1e652f1a to your computer and use it in GitHub Desktop.
Save ksdme/cce86ba6a517c1c78e2f79ce1e652f1a to your computer and use it in GitHub Desktop.
Wrapper around DigitalEyes UPC Lookup API
# Based off of https://gist.github.com/julzhk/f94948012abd8645172eddbad5207a67
import os
import base64
import hashlib
import requests
import hmac
def get_default_app_key():
return os.environ.get('DIGIT_EYES_KEY_K')
def get_default_authorization_key():
return os.environ.get('DIGIT_EYES_KEY_K')
def generate_signature(upc, auth_key=None):
"""
Returns a signature for a specific request payload.
https://www.digit-eyes.com/cgi-bin/digiteyes.cgi?action=codeLibrary
"""
if auth_key is None:
auth_key = get_default_authorization_key()
sha_hash = hmac.new(
str.encode(auth_key),
str.encode(upc),
hashlib.sha1,
)
return base64.b64encode(sha_hash.digest()).decode()
def generate_request_url(upc, signature, fields='all', app_key=None):
"""
Returns the request url.
"""
if app_key is None:
app_key = get_default_app_key()
if isinstance(fields, tuple) or isinstance(fields, list):
fields = ','.join(fields)
return (
f'https://www.digit-eyes.com/gtin/v2_0/?upcCode={upc}&'
f'field_names={fields}&language=en&app_key={app_key}&'
f'signature={signature}'
)
def lookup(upc, fields='all', app_key=None, auth_key=None):
"""
Lookup a given UPC Code on DigitalEyes.
"""
signature = generate_signature(upc, auth_key)
url = generate_request_url(upc, signature, fields, app_key)
headers = {
'cache-control': 'no-cache',
}
response = requests.get(
url,
headers=headers,
)
return (response.status_code == 200, response.json())
class DigitEyes:
"""
Provides a wrapper around DigitalEyes, provides a single lookup method
with optional caching integration.
"""
def __init__(self,
app_key=None,
auth_key=None,
cache_reader=None,
cache_writer=None,
default_fields='all'):
self.fields = fields
self.app_key = app_key
self.auth_key = auth_key
self.cache_reader = cache_reader or (lambda upc: None)
self.cache_writer = cache_writer or (lambda upc, payload: None)
def lookup(self, upc, fields=None, skip_cache=False):
"""
Performs lookups with caching support, skip_cache flag helps
with optionally skipping reading with cache.
"""
if not skip_cache:
cached = self.cache_reader(upc)
if cached is not None:
return (True, cached)
ok, result = lookup(
upc,
fields or self.default_fields,
self.app_key,
self.auth_key,
)
if ok:
self.cache_writer(upc, result)
return (ok, result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment