Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Read in a CSV file and POST the data to an API Endpoint
#!/usr/bin/python
import argparse
import csv
import json
import os
import requests
import sys
from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse
def main():
"""
Load from CSV via REST API
https://<host>:<port>/api/endpoint
:return response
"""
# set up the argument parser
parser = argparse.ArgumentParser(description="Get the required command line arguments for the script")
parser.add_argument("--username", type=str, required=True, help="Username")
parser.add_argument("--password", type=str, required=True, help="Password")
parser.add_argument("--api_path", type=str, required=True, help="API URL and path")
parser.add_argument("--filename", type=str, required=True, help="The absolute CSV file path and name")
# parse the arguments
args = parser.parse_args()
# parse the url
base_url = urlparse(args.api_path)
# split the csv file name from the file path
file_path = os.path.dirname(args.filename)
csv_filename = os.path.split(args.filename)[1]
# headers and auth
hdrs = {"Content-Type": "application/json"}
auth = HTTPBasicAuth(args.username, args.password)
params = None
api_methods = ["GET", "POST", "PUT", "DELETE"]
# check directory and filename exists
dirs = os.listdir(path=file_path)
if csv_filename not in dirs:
print("The filename {} not found in the path's directory: {}.".format(str(csv_filename), str(file_path)))
sys.exit(1)
# check arg values
if all([base_url.scheme, base_url.netloc, base_url.path]):
if "http" in base_url.scheme:
if "api" in base_url.path:
try:
with open(args.filename, "r", newline="") as csv_file:
csv_header = csv.Sniffer().has_header(csv_file.read(1024))
csv_file.seek(0)
reader = csv.reader(csv_file, delimiter=",")
if csv_header:
next(reader)
# loop the csv data
for row in reader:
# build the request body json obj from the csv data
payload = [{
"id": int(row[0]),
"name": row[1],
"description": row[2],
"ip_address": row[3],
"cidr": row[4],
"domain_name": int(row[5]),
"location": {
"name": row[6],
"latlong": [float(row[7]), float(row[8])]
},
"country_code": row[9]
}]
# encode the request body data
data = json.dumps(payload).encode("utf-8")
url = base_url.geturl()
# post the json data to endpoint
try:
r = requests.request(
api_methods[2],
url,
data=data,
auth=auth,
headers=hdrs,
verify=False
)
if r.status_code == 200:
resp = r.json()
print("Status Code: {}".format(str(r.status_code)))
print("Response Headers: {}".format(r.headers))
print("Response Body: {}".format(resp))
else:
print("API call returned HTTP Status code: {}".format(str(r.status_code)))
except requests.HTTPError as http_err:
print("HTTP Error: {}".format(str(http_err)))
except IOError as err:
print("Can not access the filename at the specified file path: {} Error: {}".format(str(args.filename), str(err)))
sys.exit(1)
else:
print("URL Parser could not find 'api' in the URL: {}, exiting".format(str(args.api_path)))
sys.exit(1)
else:
print("URL Parser could not find 'http' or 'https' in the api_path argument: {}".format(str(args.api_path)))
sys.exit(1)
else:
print("Can not parse the URL from the command line argument for api_path: {}".format(str(args.api_path)))
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.