-
-
Save josiahcarlson/ff633ed8f2f1cac2d50d0b1bab011021 to your computer and use it in GitHub Desktop.
Example client and webhook handler implementation for Liminal Network API and Final Mile webhooks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright 2024 Liminal Network | |
| # Released under the MIT license | |
| # Example client and webhook handler implementation | |
| import base64 # parsing requests | |
| import datetime # sometimes we need to know what time it is | |
| import json # for decoding some API reponses | |
| import os # to delete temporary files | |
| import sqlite3 # replace with your database access method | |
| import urllib.parse # parsing requests | |
| import urllib.request # can also use another 3rd party library | |
| import urllib.error # for images being done | |
| from typing import Union # for mypy complaints | |
| # --- plain web interface to disk | |
| # replace with your own secure credential store, and not the sandbox key | |
| class settings: | |
| # Keep the known sandbox key for the warning message in the command line | |
| # interface. | |
| KNOWN_SANDBOX_KEY = "qNAJFePYEfzZag1pDqv" | |
| LIMINAL_NETWORK_FINAL_MILE_API_KEY = "qNAJFePYEfzZag1pDqv" | |
| url = "https://api.liminalnetwork.com/{method}?auth={api_key}&pro={pro}" | |
| def get_status(pro: str) -> dict: | |
| """ | |
| Args: | |
| pro - the tracking number or pro of the shipment you would like information about | |
| Returns one of: | |
| { | |
| "errors": [...] | |
| } | |
| OR | |
| { | |
| 'delivery_date': '...', | |
| 'delivery_time': '...', | |
| 'status': '...', | |
| 'longstatus': '...', | |
| 'scac': '...', | |
| 'pro': '...' | |
| } | |
| """ | |
| full_url = url.format( | |
| method="status", | |
| api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY, | |
| pro=pro, | |
| ) | |
| return json.loads(urllib.request.urlopen(full_url).read().decode()) | |
| def get_pdf_images(pro: str, which: str, test_output: bool = False): | |
| """ | |
| Args: | |
| pro - shipment identifier | |
| which - one of "lading" or "proof" | |
| test_output - if true, check the content of the output to verify that | |
| it is probably a PDF | |
| Fetches the images for the given PRO from Liminal Network as a PDF, | |
| saving to "<pro>_<which>.pdf" on the local filesystem. | |
| Returns: | |
| Error message returned by server on non-2xx response as dictionary | |
| Filename of pdf stored for 2xx responses as string | |
| """ | |
| full_url = ( | |
| url.format( | |
| method=which, | |
| api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY, | |
| pro=pro, | |
| ) | |
| + "&dl=1" | |
| ) | |
| with urllib.request.urlopen(full_url) as resp: | |
| rr = resp.read() | |
| filename_header = resp.headers["content-disposition"] | |
| if not filename_header: | |
| return json.loads(rr.decode()) | |
| filename = filename_header.partition("=")[-1].strip('"') | |
| if test_output: | |
| assert filename.endswith(".pdf"), ( | |
| "File should be a pdf, not a: " + filename.rpartition(".")[-1] | |
| ) | |
| assert b"PDF" in rr[:4], "File does not seem to be a pdf" | |
| # should be <pro>.pdf | |
| with open(filename, "wb") as out: | |
| out.write(rr) | |
| return filename | |
| def get_individual_images( | |
| pro: str, which: str, indexes: tuple = (), test_output: bool = False | |
| ): | |
| """ | |
| Args: | |
| pro - shipment identifier | |
| which - one of "lading" or "proof" | |
| indexes - tuple or general iterable of indexes of images to fetch, | |
| containing integer values 0 to 10, inclusive. | |
| Default: (1,2,3,4,5,6,7,8,9,10) | |
| Image 0 is the "header" image from the pdf, 1 is the first real | |
| image. | |
| test_output - if true, check the content of the each output file to | |
| verify that it is probably a jpeg image | |
| Fetches the images for the given PRO from Liminal Network as jpegs, | |
| saving to "<pro>_<which>_<number>.jpg" on the local filesystem for any | |
| images fetched. | |
| Returns: | |
| List of image filenames stored on the local disk. | |
| """ | |
| if not indexes: | |
| # up to 5 normal images, 5 issue images | |
| indexes = tuple(range(1, 11)) | |
| partial_url = ( | |
| url.format( | |
| method=which, | |
| api_key=settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY, | |
| pro=pro, | |
| ) | |
| + "&dl=1" | |
| ) | |
| written = [] | |
| for i in indexes: | |
| with urllib.request.urlopen(partial_url + f"&image={i}") as resp: | |
| rr = resp.read() | |
| filename_header = resp.headers["content-disposition"] | |
| if not filename_header: | |
| # no more images | |
| break | |
| filename = filename_header.partition("=")[-1].strip('"') | |
| if test_output: | |
| expect = b"PNG" if filename.endswith(".png") else b"JFIF" | |
| typ = filename.rpartition(".")[-1] | |
| assert typ in ("png", "jpg"), f"Unexpected filetype: {typ}" | |
| assert ( | |
| expect in rr[:8] | |
| ), f"{filename} does not have expected {typ} content" | |
| # image=0 will be <pro>.png | |
| # image=1+ will be <pro>_<image_type>.jpg | |
| with open(filename, "wb") as out: | |
| out.write(rr) | |
| written.append(filename) | |
| return written | |
| # --- take web -> disk output and insert into sqlite database | |
| def setup_schema(conn): | |
| """ | |
| Args: | |
| conn - sqlite or other db connection with .execute() and .commit() | |
| Applies embedded ddl to the given conn. | |
| Note: syntax is valid SQLite3, unknown compatibility with other databases. | |
| """ | |
| ddl = [ | |
| """ | |
| CREATE TABLE IF NOT EXISTS known_shipments( | |
| pro TEXT UNIQUE ON CONFLICT REPLACE, | |
| status TEXT, | |
| longstatus TEXT, | |
| delivery_time TEXT, | |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| """, | |
| """ | |
| CREATE TABLE IF NOT EXISTS shipment_images( | |
| known_pro TEXT REFERENCES known_shipments(pro) ON DELETE CASCADE, | |
| image_identifier TEXT, | |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| image_data BLOB, | |
| UNIQUE (known_pro, image_identifier) | |
| ON CONFLICT REPLACE | |
| ); | |
| """, | |
| ] | |
| for d_i in ddl: | |
| conn.execute(d_i) | |
| conn.commit() | |
| def insert_data(conn, table: str, data: dict): | |
| """ | |
| Args: | |
| conn - sqlite or other db connection with .execute() and .commit() | |
| table - what table to insert into | |
| data - data to insert into the database table | |
| Inserts data into the provided table, committing the results when done. | |
| Uses '?' for argument wildcards, may be compatible with other database | |
| client libraries. | |
| """ | |
| cols = list(data) | |
| columns = ",".join(cols) | |
| vals = ",".join(len(cols) * ["?"]) | |
| query = f"INSERT INTO {table}({columns}) VALUES ({vals});" "" | |
| conn.execute(query, [data[k] for k in cols]) | |
| conn.commit() | |
| def get_status_to_db(conn, pro: str) -> dict: | |
| """ | |
| Args: | |
| conn - sqlite3 connection object | |
| pro - pro to get images for | |
| Inserts the status metadata for the given pro into the database specified, | |
| into the table named known_shipments. On error will not change the DB. | |
| Returns: | |
| get_status() call results for testing / verification | |
| """ | |
| status = get_status(pro) | |
| if "errors" in status: | |
| return status | |
| to_insert = { | |
| k: status[k] for k in ("pro", "status", "longstatus", "delivery_time") | |
| } | |
| insert_data(conn, "known_shipments", to_insert) | |
| return status | |
| def get_images_to_db(conn, pro: str): | |
| """ | |
| Args: | |
| conn - sqlite3 connection object | |
| pro - pro to get images for | |
| Inserts all images for the given pro into the database specified, | |
| into the table named shipment_images. | |
| """ | |
| for image_name in get_individual_images(pro, "proof"): | |
| with open(image_name, "rb") as img: | |
| # If you wanted to use an object store instead, | |
| # change this code. | |
| img_data = img.read() | |
| to_insert = { | |
| "known_pro": pro, | |
| "image_identifier": image_name, | |
| "image_data": img_data, | |
| } | |
| insert_data(conn, "shipment_images", to_insert) | |
| # remember to delete the local temporary file | |
| os.unlink(image_name) | |
| def get_all_to_db(conn, pro: str): | |
| """ | |
| Args: | |
| conn - sqlite3 connection object | |
| pro - pro to get status and images for | |
| Will call get_status_to_db(), and if the status is one to expect images, | |
| will subsequently call get_images_to_db(). | |
| """ | |
| status = get_status_to_db(conn, pro) | |
| if "errors" not in status: | |
| # can get any uploaded images any time there isn't an error | |
| get_images_to_db(conn, pro) | |
| # --- handle webhook requests generically | |
| def handle_request( | |
| conn, request_body: Union[str, bytes], body_base64_encoded: bool | |
| ) -> str: | |
| """ | |
| Args: | |
| conn - database connection | |
| request_body - the body of the http(s) request | |
| body_base64_encoded - true if the body was base64 encoded, and must be decoded | |
| Handles "start", "image", and "end" webhook requests, inserting their results into | |
| our demonstration sqlite db. | |
| """ | |
| # decode the post body | |
| if body_base64_encoded: | |
| request_body = base64.b64decode( | |
| request_body | |
| if isinstance(request_body, bytes) | |
| else request_body.encode("latin-1") | |
| ) | |
| body = ( | |
| request_body | |
| if isinstance(request_body, str) | |
| else request_body.decode("latin-1") | |
| ) | |
| # parse the post body | |
| post_data = urllib.parse.parse_qs(body) | |
| for it in ("ref", "what"): | |
| if not post_data.get(it): | |
| return f"error-{it}" | |
| ref = post_data.get("ref")[0] | |
| what = post_data.get("what")[0] | |
| now_dt = datetime.datetime.now(datetime.timezone.utc) | |
| now = now_dt.replace(microsecond=0, tzinfo=None).isoformat() | |
| now += "+0000" | |
| # dispatch | |
| if what == "start": | |
| handle_start(conn, now, ref) | |
| return "ok" | |
| if what == "image": | |
| return handle_image(conn, post_data, now, ref) | |
| if what == "end": | |
| return handle_end(conn, post_data, now, ref) | |
| return "error-unknown-" + what | |
| def handle_start(conn, now: str, ref: str): | |
| """ | |
| Args: | |
| conn - database connection | |
| now - utcnow string | |
| ref - pro, tracking, or reference number for the shipment | |
| Ensures that ref is represented in known_shipments. If not, | |
| will add the row with status of QR_SCANNED. | |
| """ | |
| exist_query = "SELECT pro FROM known_shipments WHERE pro = ?" | |
| if list(conn.execute(exist_query, [ref])): | |
| # don't re-insert | |
| return | |
| insert_data( | |
| conn, | |
| "known_shipments", | |
| { | |
| "pro": ref, | |
| "status": "QR_SCANNED", | |
| "longstatus": "QR Code was scanned", | |
| "delivery_time": now, | |
| }, | |
| ) | |
| def handle_image(conn, post_data: dict, now: str, ref: str) -> str: | |
| """ | |
| Args: | |
| conn - database connection | |
| post_data - {"name": ["val"], ...} | |
| now - utcnow string | |
| ref - pro, tracking, or reference number for the shipment | |
| If "ok" is returned, will have processed and inserted the image in the post | |
| data to the database. | |
| Returns one of: | |
| "ok" | |
| "error-<what was the problem>" | |
| """ | |
| for it in ("filename", "image"): | |
| if not post_data.get(it): | |
| return f"error-{it}" | |
| image = post_data["image"][0] | |
| if not image.startswith("data:image/jpeg;base64,"): | |
| return "error-image-data" | |
| try: | |
| image_bytes = base64.b64decode(image.partition(",")[-1].encode()) | |
| except Exception: | |
| # POST body should support at least up to 418,000 bytes to | |
| # receive image data. If your platform encodes base64 as handled | |
| # by handle_request(), then you should ensure your post data limit | |
| # is at least 558,000 bytes. | |
| return "error-image-data" | |
| # ensure foreign key exists | |
| handle_start(conn, now, ref) | |
| # will be <pro>_<image_type>.jpg, which matches the filenames returned | |
| # by get_individual_images() | |
| filename = post_data["filename"][0] | |
| insert_data( | |
| conn, | |
| "shipment_images", | |
| { | |
| "known_pro": ref[0], | |
| "image_identifier": filename, | |
| "image_data": image_bytes, | |
| }, | |
| ) | |
| return "ok" | |
| def handle_end(conn, post_data: dict, now: str, ref: str) -> str: | |
| """ | |
| Args: | |
| conn - database connection | |
| post_data - {"name": ["val"], ...} | |
| now - utcnow string | |
| ref - pro, tracking, or reference number for the shipment | |
| If "ok" is returned, will have updated the status of the provided | |
| shipment to whatever was provided in the post_data. | |
| Returns one of: | |
| "ok" | |
| "error-<what was the problem>" | |
| """ | |
| if not post_data.get("longstatus"): | |
| return "error-longstatus" | |
| # this matches what is returned by get_status() | |
| status, _, longstatus = post_data.get("longstatus")[0].partition(" - ") | |
| # ensure the row exists | |
| handle_start(conn, now, ref) | |
| # update the row | |
| conn.execute( | |
| """ | |
| UPDATE known_shipments | |
| SET status = ?, longstatus = ? | |
| WHERE pro = ? | |
| """, | |
| [status, longstatus, ref], | |
| ) | |
| conn.commit() | |
| return "ok" | |
| def main(): | |
| import argparse | |
| import tempfile | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--verbose", | |
| default=False, | |
| action="store_true", | |
| help="Provide to receive additional output", | |
| ) | |
| parser.add_argument( | |
| "--creds", | |
| default=None, | |
| help="The 'package.module.attribute' to use for your API Key", | |
| ) | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument( | |
| "--database", | |
| default=False, | |
| action="store_true", | |
| help="Provide to store in a local sqlite db, see --sqlite-file for name", | |
| ) | |
| group.add_argument( | |
| "--files", | |
| default=False, | |
| action="store_true", | |
| help="Provide to store in individual .jpg and .json files, see --dirname for name", | |
| ) | |
| parser.add_argument( | |
| "--sqlite-file", | |
| default="finalmile_test.sqlite3", | |
| help="Sqlite database to store our data to", | |
| ) | |
| parser.add_argument( | |
| "--dirname", | |
| default=tempfile.gettempdir(), | |
| help="Where to store downloaded files", | |
| ) | |
| group2 = parser.add_mutually_exclusive_group(required=True) | |
| group2.add_argument( | |
| "--dispatch", | |
| help="The filename of the json-encoded [request_body, is_base64_encoded] stored inside for dispatching via handle_request() (requires --database)", | |
| ) | |
| group2.add_argument( | |
| "pro", | |
| nargs="*", | |
| default=[], | |
| help="The tracking number, pro, or reference number for your shipment", | |
| ) | |
| args = parser.parse_args() | |
| if args.dispatch and not args.database: | |
| print("--dispatch requires --database") | |
| return exit(1) | |
| if not args.creds: | |
| if ( | |
| args.verbose | |
| and settings.KNOWN_SANDBOX_KEY | |
| == settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY | |
| ): | |
| print("Warning: using SANDBOX API Key") | |
| else: | |
| import importlib | |
| module, _, attribute = args.creds.rpartition(".") | |
| package = None | |
| if "." in module: | |
| package, _, module = module.rpartition(".") | |
| mod = importlib.import_module(module, package) | |
| settings.LIMINAL_NETWORK_FINAL_MILE_API_KEY = getattr(mod, attribute) | |
| if args.verbose: | |
| print("Used API Key from", args.creds) | |
| if args.database: | |
| if args.verbose: | |
| print("Opening database and ensuring schema validity") | |
| sq_conn = sqlite3.Connection(args.sqlite_file) | |
| setup_schema(sq_conn) | |
| if args.dispatch: | |
| if args.verbose: | |
| print("Opening", args.dispatch, "for incoming request data") | |
| with open(args.dispatch, "r") as inp: | |
| body, is_base64 = json.load(inp) | |
| resp = handle_request(sq_conn, body, is_base64) | |
| print("Response to request data:", resp) | |
| sq_conn.close() | |
| return | |
| elif args.database: | |
| for pro in args.pro: | |
| if args.verbose: | |
| print("Fetching", pro) | |
| get_all_to_db(sq_conn, pro) | |
| sq_conn.close() | |
| return | |
| # output to files | |
| cwd = os.getcwd() | |
| try: | |
| os.chdir(args.dirname) | |
| for pro in args.pro: | |
| if args.verbose: | |
| print("Fetching", pro) | |
| status = get_status(pro) | |
| if "error" not in status: | |
| with open(f"{pro}.json", "w") as out: | |
| json.dump(status, out) | |
| get_individual_images(pro, "proof") | |
| elif args.verbose: | |
| print("Shipment had error:", status) | |
| finally: | |
| os.chdir(cwd) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment