Skip to content

Instantly share code, notes, and snippets.

@ecmonsen
Created October 3, 2023 23:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ecmonsen/76759c5ab42a1973ef2dac7668bfe883 to your computer and use it in GitHub Desktop.
Save ecmonsen/76759c5ab42a1973ef2dac7668bfe883 to your computer and use it in GitHub Desktop.
Pseudo-python in response to a recent interview question.
"""
Build the start of an e2e pipeline designed to be robust, extensible and scalable.
Approach and structure is open ended.
Use any packages you like but ensure code is as close to executable as possible.
Input
API endpoint = "testurl.com/endpoint"
- json response that contains some IDs, a description of that ID, and the most
recent modification date for that id’s description.
Has three columns:
Ex: [{“id”: “hello world”, “description”: “lorem”, “timestamp”: “ipsum”}, {}, …]
id
description
timestamp
Output
./output/<filename>.csv
- A flat file that contains all of the records of the API call and has these four columns:
id
description
timestamp
ingestion_time - time at which you call the API
"""
from datetime import datetime
import requests
import csv
import json
import argparse
import os
import logging
logger = logging.getLogger("pipeline")
def pipeline(endpoint, output_dir, fields : Iterable):
# endpoint = "testurl.com/endpoint"
# output_path = './output/<filename>.csv'
dt = datetime.now()
output_path = os.path.join(output_dir, f"table_{dt}.csv")
try:
response_body = requests.get(endpoint)
j = json.read(response_body)
with open(output_path, 'w') as f:
csv_writer = csv.writer(output_path)
csv.write(fields)
for row in j:
csv.write([row.get(field, "") for field in fields] + [str(dt)])
except JsonError e:
logger.error("error reading response json")
except HttpError e:
if e.status >= 500:
retry() # TODO
elif e.status >= 400:
logger.error(e)
raise e
DEFAULT_ENDPOINT=="testurl.com/endpoint"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT)
parser.add_argument("output_path")
args = parser.parse_args()
pipeline(args.endpoint, args.output_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment