Skip to content

Instantly share code, notes, and snippets.

@josiahcarlson
Created August 5, 2024 17:57
Show Gist options
  • Select an option

  • Save josiahcarlson/ff633ed8f2f1cac2d50d0b1bab011021 to your computer and use it in GitHub Desktop.

Select an option

Save josiahcarlson/ff633ed8f2f1cac2d50d0b1bab011021 to your computer and use it in GitHub Desktop.
Example client and webhook handler implementation for Liminal Network API and Final Mile webhooks
# Copyright 2024 Liminal Network
# Released under the MIT license
# Example client and webhook handler implementation
import base64 # parsing requests
import datetime # sometimes we need to know what time it is
import json # for decoding some API reponses
import os # to delete temporary files
import sqlite3 # replace with your database access method
import urllib.parse # parsing requests
import urllib.request # can also use another 3rd party library
import urllib.error # for images being done
from typing import Union # for mypy complaints
# --- plain web interface to disk
# replace with your own secure credential store, and not the sandbox key
class settings:
# Keep the known sandbox key for the warning message in the command line
# interface.
KNOWN_SANDBOX_KEY = "qNAJFePYEfzZag1pDqv"
LIMINAL_NETWORK_FINAL_MILE_API_KEY = "qNAJFePYEfzZag1pDqv"
url = "https://api.liminalnetwork.com/{method}?auth={api_key}&pro={pro}"
def get_status(pro: str) -> dict:
"""
Args:
pro - the tracking number or pro of the shipment you would like information about
Returns one of:
{
"errors": [...]
}
OR
{
'delivery_date': '...',
'delivery_time': '...',
'status': '...',
'longstatus': '...',
'scac': '...',
'pro': '...'
}
"""
full_url = url.format(
method="status",
api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY,
pro=pro,
)
return json.loads(urllib.request.urlopen(full_url).read().decode())
def get_pdf_images(pro: str, which: str, test_output: bool = False):
"""
Args:
pro - shipment identifier
which - one of "lading" or "proof"
test_output - if true, check the content of the output to verify that
it is probably a PDF
Fetches the images for the given PRO from Liminal Network as a PDF,
saving to "<pro>_<which>.pdf" on the local filesystem.
Returns:
Error message returned by server on non-2xx response as dictionary
Filename of pdf stored for 2xx responses as string
"""
full_url = (
url.format(
method=which,
api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY,
pro=pro,
)
+ "&dl=1"
)
with urllib.request.urlopen(full_url) as resp:
rr = resp.read()
filename_header = resp.headers["content-disposition"]
if not filename_header:
return json.loads(rr.decode())
filename = filename_header.partition("=")[-1].strip('"')
if test_output:
assert filename.endswith(".pdf"), (
"File should be a pdf, not a: " + filename.rpartition(".")[-1]
)
assert b"PDF" in rr[:4], "File does not seem to be a pdf"
# should be <pro>.pdf
with open(filename, "wb") as out:
out.write(rr)
return filename
def get_individual_images(
pro: str, which: str, indexes: tuple = (), test_output: bool = False
):
"""
Args:
pro - shipment identifier
which - one of "lading" or "proof"
indexes - tuple or general iterable of indexes of images to fetch,
containing integer values 0 to 10, inclusive.
Default: (1,2,3,4,5,6,7,8,9,10)
Image 0 is the "header" image from the pdf, 1 is the first real
image.
test_output - if true, check the content of the each output file to
verify that it is probably a jpeg image
Fetches the images for the given PRO from Liminal Network as jpegs,
saving to "<pro>_<which>_<number>.jpg" on the local filesystem for any
images fetched.
Returns:
List of image filenames stored on the local disk.
"""
if not indexes:
# up to 5 normal images, 5 issue images
indexes = tuple(range(1, 11))
partial_url = (
url.format(
method=which,
api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY,
pro=pro,
)
+ "&dl=1"
)
written = []
for i in indexes:
with urllib.request.urlopen(partial_url + f"&image={i}") as resp:
rr = resp.read()
filename_header = resp.headers["content-disposition"]
if not filename_header:
# no more images
break
filename = filename_header.partition("=")[-1].strip('"')
if test_output:
expect = b"PNG" if filename.endswith(".png") else b"JFIF"
typ = filename.rpartition(".")[-1]
assert typ in ("png", "jpg"), f"Unexpected filetype: {typ}"
assert (
expect in rr[:8]
), f"{filename} does not have expected {typ} content"
# image=0 will be <pro>.png
# image=1+ will be <pro>_<image_type>.jpg
with open(filename, "wb") as out:
out.write(rr)
written.append(filename)
return written
# --- take web -> disk output and insert into sqlite database
def setup_schema(conn):
"""
Args:
conn - sqlite or other db connection with .execute() and .commit()
Applies embedded ddl to the given conn.
Note: syntax is valid SQLite3, unknown compatibility with other databases.
"""
ddl = [
"""
CREATE TABLE IF NOT EXISTS known_shipments(
pro TEXT UNIQUE ON CONFLICT REPLACE,
status TEXT,
longstatus TEXT,
delivery_time TEXT,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""",
"""
CREATE TABLE IF NOT EXISTS shipment_images(
known_pro TEXT REFERENCES known_shipments(pro) ON DELETE CASCADE,
image_identifier TEXT,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
image_data BLOB,
UNIQUE (known_pro, image_identifier)
ON CONFLICT REPLACE
);
""",
]
for d_i in ddl:
conn.execute(d_i)
conn.commit()
def insert_data(conn, table: str, data: dict):
"""
Args:
conn - sqlite or other db connection with .execute() and .commit()
table - what table to insert into
data - data to insert into the database table
Inserts data into the provided table, committing the results when done.
Uses '?' for argument wildcards, may be compatible with other database
client libraries.
"""
cols = list(data)
columns = ",".join(cols)
vals = ",".join(len(cols) * ["?"])
query = f"INSERT INTO {table}({columns}) VALUES ({vals});" ""
conn.execute(query, [data[k] for k in cols])
conn.commit()
def get_status_to_db(conn, pro: str) -> dict:
"""
Args:
conn - sqlite3 connection object
pro - pro to get images for
Inserts the status metadata for the given pro into the database specified,
into the table named known_shipments. On error will not change the DB.
Returns:
get_status() call results for testing / verification
"""
status = get_status(pro)
if "errors" in status:
return status
to_insert = {
k: status[k] for k in ("pro", "status", "longstatus", "delivery_time")
}
insert_data(conn, "known_shipments", to_insert)
return status
def get_images_to_db(conn, pro: str):
"""
Args:
conn - sqlite3 connection object
pro - pro to get images for
Inserts all images for the given pro into the database specified,
into the table named shipment_images.
"""
for image_name in get_individual_images(pro, "proof"):
with open(image_name, "rb") as img:
# If you wanted to use an object store instead,
# change this code.
img_data = img.read()
to_insert = {
"known_pro": pro,
"image_identifier": image_name,
"image_data": img_data,
}
insert_data(conn, "shipment_images", to_insert)
# remember to delete the local temporary file
os.unlink(image_name)
def get_all_to_db(conn, pro: str):
"""
Args:
conn - sqlite3 connection object
pro - pro to get status and images for
Will call get_status_to_db(), and if the status is one to expect images,
will subsequently call get_images_to_db().
"""
status = get_status_to_db(conn, pro)
if "errors" not in status:
# can get any uploaded images any time there isn't an error
get_images_to_db(conn, pro)
# --- handle webhook requests generically
def handle_request(
conn, request_body: Union[str, bytes], body_base64_encoded: bool
) -> str:
"""
Args:
conn - database connection
request_body - the body of the http(s) request
body_base64_encoded - true if the body was base64 encoded, and must be decoded
Handles "start", "image", and "end" webhook requests, inserting their results into
our demonstration sqlite db.
"""
# decode the post body
if body_base64_encoded:
request_body = base64.b64decode(
request_body
if isinstance(request_body, bytes)
else request_body.encode("latin-1")
)
body = (
request_body
if isinstance(request_body, str)
else request_body.decode("latin-1")
)
# parse the post body
post_data = urllib.parse.parse_qs(body)
for it in ("ref", "what"):
if not post_data.get(it):
return f"error-{it}"
ref = post_data.get("ref")[0]
what = post_data.get("what")[0]
now_dt = datetime.datetime.now(datetime.timezone.utc)
now = now_dt.replace(microsecond=0, tzinfo=None).isoformat()
now += "+0000"
# dispatch
if what == "start":
handle_start(conn, now, ref)
return "ok"
if what == "image":
return handle_image(conn, post_data, now, ref)
if what == "end":
return handle_end(conn, post_data, now, ref)
return "error-unknown-" + what
def handle_start(conn, now: str, ref: str):
"""
Args:
conn - database connection
now - utcnow string
ref - pro, tracking, or reference number for the shipment
Ensures that ref is represented in known_shipments. If not,
will add the row with status of QR_SCANNED.
"""
exist_query = "SELECT pro FROM known_shipments WHERE pro = ?"
if list(conn.execute(exist_query, [ref])):
# don't re-insert
return
insert_data(
conn,
"known_shipments",
{
"pro": ref,
"status": "QR_SCANNED",
"longstatus": "QR Code was scanned",
"delivery_time": now,
},
)
def handle_image(conn, post_data: dict, now: str, ref: str) -> str:
"""
Args:
conn - database connection
post_data - {"name": ["val"], ...}
now - utcnow string
ref - pro, tracking, or reference number for the shipment
If "ok" is returned, will have processed and inserted the image in the post
data to the database.
Returns one of:
"ok"
"error-<what was the problem>"
"""
for it in ("filename", "image"):
if not post_data.get(it):
return f"error-{it}"
image = post_data["image"][0]
if not image.startswith("data:image/jpeg;base64,"):
return "error-image-data"
try:
image_bytes = base64.b64decode(image.partition(",")[-1].encode())
except Exception:
# POST body should support at least up to 418,000 bytes to
# receive image data. If your platform encodes base64 as handled
# by handle_request(), then you should ensure your post data limit
# is at least 558,000 bytes.
return "error-image-data"
# ensure foreign key exists
handle_start(conn, now, ref)
# will be <pro>_<image_type>.jpg, which matches the filenames returned
# by get_individual_images()
filename = post_data["filename"][0]
insert_data(
conn,
"shipment_images",
{
"known_pro": ref[0],
"image_identifier": filename,
"image_data": image_bytes,
},
)
return "ok"
def handle_end(conn, post_data: dict, now: str, ref: str) -> str:
"""
Args:
conn - database connection
post_data - {"name": ["val"], ...}
now - utcnow string
ref - pro, tracking, or reference number for the shipment
If "ok" is returned, will have updated the status of the provided
shipment to whatever was provided in the post_data.
Returns one of:
"ok"
"error-<what was the problem>"
"""
if not post_data.get("longstatus"):
return "error-longstatus"
# this matches what is returned by get_status()
status, _, longstatus = post_data.get("longstatus")[0].partition(" - ")
# ensure the row exists
handle_start(conn, now, ref)
# update the row
conn.execute(
"""
UPDATE known_shipments
SET status = ?, longstatus = ?
WHERE pro = ?
""",
[status, longstatus, ref],
)
conn.commit()
return "ok"
def main():
import argparse
import tempfile
parser = argparse.ArgumentParser()
parser.add_argument(
"--verbose",
default=False,
action="store_true",
help="Provide to receive additional output",
)
parser.add_argument(
"--creds",
default=None,
help="The 'package.module.attribute' to use for your API Key",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--database",
default=False,
action="store_true",
help="Provide to store in a local sqlite db, see --sqlite-file for name",
)
group.add_argument(
"--files",
default=False,
action="store_true",
help="Provide to store in individual .jpg and .json files, see --dirname for name",
)
parser.add_argument(
"--sqlite-file",
default="finalmile_test.sqlite3",
help="Sqlite database to store our data to",
)
parser.add_argument(
"--dirname",
default=tempfile.gettempdir(),
help="Where to store downloaded files",
)
group2 = parser.add_mutually_exclusive_group(required=True)
group2.add_argument(
"--dispatch",
help="The filename of the json-encoded [request_body, is_base64_encoded] stored inside for dispatching via handle_request() (requires --database)",
)
group2.add_argument(
"pro",
nargs="*",
default=[],
help="The tracking number, pro, or reference number for your shipment",
)
args = parser.parse_args()
if args.dispatch and not args.database:
print("--dispatch requires --database")
return exit(1)
if not args.creds:
if (
args.verbose
and settings.KNOWN_SANDBOX_KEY
== settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY
):
print("Warning: using SANDBOX API Key")
else:
import importlib
module, _, attribute = args.creds.rpartition(".")
package = None
if "." in module:
package, _, module = module.rpartition(".")
mod = importlib.import_module(module, package)
settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY = getattr(mod, attribute)
if args.verbose:
print("Used API Key from", args.creds)
if args.database:
if args.verbose:
print("Opening database and ensuring schema validity")
sq_conn = sqlite3.Connection(args.sqlite_file)
setup_schema(sq_conn)
if args.dispatch:
if args.verbose:
print("Opening", args.dispatch, "for incoming request data")
with open(args.dispatch, "r") as inp:
body, is_base64 = json.load(inp)
resp = handle_request(sq_conn, body, is_base64)
print("Response to request data:", resp)
sq_conn.close()
return
elif args.database:
for pro in args.pro:
if args.verbose:
print("Fetching", pro)
get_all_to_db(sq_conn, pro)
sq_conn.close()
return
# output to files
cwd = os.getcwd()
try:
os.chdir(args.dirname)
for pro in args.pro:
if args.verbose:
print("Fetching", pro)
status = get_status(pro)
if "error" not in status:
with open(f"{pro}.json", "w") as out:
json.dump(status, out)
get_individual_images(pro, "proof")
elif args.verbose:
print("Shipment had error:", status)
finally:
os.chdir(cwd)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment