Skip to content

Instantly share code, notes, and snippets.

@mmeisinger
Last active January 28, 2016 18:04
Show Gist options
  • Save mmeisinger/a1b57ac529f617950ef1 to your computer and use it in GitHub Desktop.
Save mmeisinger/a1b57ac529f617950ef1 to your computer and use it in GitHub Desktop.
File upload to AgXchange
===========================================================
file_upload.py
Script to upload files to AgXchange(tm).
===========================================================
PREREQUISITES
=============
Python 2.7.x installed
pip installed
virtualenv suggested
AgXchange credentials
Network access to AgXchange system
INSTALL
=======
Activate virtualenv of your choice
> source venv/bin/activate
Install package dependencies
> pip install -r requirements.txt
USAGE
=====
Run the script and show arguments:
> python file_upload.py --help
Examples:
> python file_upload.py -u myuser@email.com -p passwd -f ~/Documents/myfile.pdf -n "My File.pdf"
> python file_upload.py -u myuser@email.com -p passwd -f ~/Documents/myfile.pdf -n "My File.pdf" -l "Documents/MyFolder"
#!/usr/bin/env python
"""
file_upload.py
(c) AgPro Exchange, 2016
Given URL of the AgXchange system, a set of credentials, a local file name and a target file
name, uploads the file to AgXchange, by mirroring the actions the AgXchange UI client performs.
Upload involves AgXchange login, AgXchangeAPI calls and direct file upload to AWS S3.
"""
import argparse
import json
import mimetypes
import os
import requests
import sys
import time
DEFAULT_SYS_URL = "https://agxchange.com"
DEFAULT_AGX_CLIENT_ID = "agprox_ui"
def main():
parser = argparse.ArgumentParser(description="AgXchange file upload")
parser.add_argument('-s', '--system_url', type=str, help='AgXchange URL', default=DEFAULT_SYS_URL)
parser.add_argument('-u', '--username', type=str, help='AgXchange username/email', default="")
parser.add_argument('-p', '--password', type=str, help='AgXchange password', default="")
parser.add_argument('-f', '--filepath', type=str, help='Path to a local file', default="")
parser.add_argument('-n', '--filename', type=str, help='Alternative name of file on AgXchange', default="")
parser.add_argument('-d', '--filedesc', type=str, help='Description for file on AgXchange', default="")
parser.add_argument('-l', '--location', type=str, help='Target folder on AgXchange', default="")
opts, extra = parser.parse_known_args()
print "AgXchange(tm) file uploader"
print "==========================="
fu = FileUploader(opts)
fu.agx_auth()
fu.upload_file()
def errout(message):
print "ERROR: %s" % message
sys.exit(1)
class FileUploader(object):
def __init__(self, opts):
self.agx_base_url = opts.system_url or DEFAULT_SYS_URL
self.agx_rest_url = self.agx_base_url + "/service/"
self.username = opts.username
self.password = opts.password
self.filename = opts.filepath
self.target_loc = opts.location
self.target_name = opts.filename
self.target_desc = opts.filedesc
self.session = requests.session()
def agx_auth(self):
# Check URL
resp = self.session.get(self.agx_rest_url + "/")
if resp.status_code != 200:
errout("AgXchange URL not valid: " + resp.text)
resp_json = resp.json()
if "result" not in resp_json or "service_gateway/ScionCC" not in resp_json["result"]:
errout("Unexpected server endpoint")
print "AgXchange URL '%s' verified." % self.agx_base_url
# Authenticate using OAuth2
auth_params = {"client_id": DEFAULT_AGX_CLIENT_ID, "grant_type": "password",
"username": self.username, "password": self.password}
resp = self.session.post(self.agx_base_url + "/oauth/token", data=auth_params)
if resp.status_code != 200:
errout("Could not access AgXchange auth endpoint: " + resp.text)
resp_json = resp.json()
if "access_token" not in resp_json:
errout("Could not obtain access token")
self.auth_info = resp_json
self.auth_headers = {"Authorization": "Bearer %s" % resp_json["access_token"]}
# Get user session info
resp = self.session.get(self.agx_base_url + "/auth/session", headers=self.auth_headers)
if resp.status_code != 200:
errout("Could not get user session: " + resp.text)
resp_json = resp.json()
if "result" not in resp_json or "actor_id" not in resp_json["result"]:
errout("Could not get actor_id from session")
self.actor_id = resp_json["result"]["actor_id"]
print "Authenticated with AgXchange as user '%s'." % self.username
def upload_file(self):
if not os.path.exists(self.filename):
errout("File to upload does not exist: %s" % self.filename)
# Get S3 direct upload URL
service_url = self.agx_rest_url + "/request/agpro_exchange/get_signed_s3_request"
base_filename = os.path.split(self.filename)[1]
if self.target_name:
target_filename = self.escape_filename(self.target_name)
else:
target_filename = self.escape_filename(base_filename)
if not target_filename:
errout("Target filename invalid")
timestamp = str(int(time.time()) * 1000)
s3_object_name = "/home/" + self.actor_id + "/files/"+ self.actor_id + '.' + target_filename + timestamp
file_mimetype = mimetypes.guess_type(self.filename)[0]
service_args = dict(s3_object_type=file_mimetype, s3_object_name=s3_object_name)
resp = self.session.post(service_url, data=dict(data=json.dumps(dict(params=service_args))))
if resp.status_code != 200:
errout("Could not get AWS S3 signed URL: " + resp.text)
resp_json = resp.json()
if "result" not in resp_json:
errout("Could not obtain service result")
signed_request = resp_json["result"]
# Find target folder on AgXchange (before upload)
parent_id = ""
if self.target_loc:
parent_id = self.get_parent_id(self.target_loc)
if parent_id:
print "Found target folder '%s' on AgXchange." % self.target_loc
else:
errout("Could not find target location '%s' on AgXchange" % self.target_loc)
# Upload to S3
signed_s3_url = signed_request["signed_request"]
aws_session = requests.session()
aws_headers = {"Access-Control-Request-Headers": "content-type, x-amz-acl",
"Access-Control-Request-Method": "PUT",
"Origin": self.agx_base_url}
resp = aws_session.options(signed_s3_url, headers=aws_headers)
if resp.status_code != 200:
errout("AWS S3 did not accept signed URL: " + resp.text)
print "AWS S3 signed request accepted."
print "Reading file '%s'" % self.filename
with open(self.filename, "rb") as f:
file_data = f.read()
file_size = len(file_data)
print "Found file '%s': (length=%s, type=%s)." % (base_filename, file_size, file_mimetype)
print "Attempting file upload to AWS S3..."
aws_headers = {"Origin": self.agx_base_url,
"Content-type": file_mimetype}
resp = aws_session.put(signed_s3_url, headers=aws_headers, data=file_data)
if resp.status_code != 200:
errout("AWS S3 upload not successful: " + resp.text)
print "File uploaded to AWS S3."
# Register file with AgXchange
service_url = self.agx_rest_url + "/request/agpro_exchange/define_information_asset"
service_args = dict(name=target_filename, description=self.target_desc,
content_type=file_mimetype, content_length=file_size,
attributes=dict(s3_url=signed_request["url"]),
parent_id=parent_id)
resp = self.session.post(service_url, data=dict(data=json.dumps(dict(params=service_args))))
if resp.status_code == 409:
target_filename = target_filename + "_" + timestamp
print "File already exists on AgXchange - renaming to: '%s'" % target_filename
service_args = dict(name=target_filename, description=self.target_desc,
content_type=file_mimetype, content_length=file_size,
attributes=dict(s3_url=signed_request["url"]),
parent_id=parent_id)
resp = self.session.post(service_url, data=dict(data=json.dumps(dict(params=service_args))))
if resp.status_code != 200:
errout("Could not register file with AgXchange: " + resp.text)
resp_json = resp.json()
if "result" not in resp_json:
errout("Could not obtain service result")
print "File registered with AgXchange as '%s/%s'." % (self.target_loc or "(default)", target_filename)
def get_parent_id(self, target_loc):
target_parts = target_loc.split("/")
service_url = self.agx_rest_url + "/request/agpro_exchange/find_information_assets_ui"
current_parent_id = ""
current_path = ""
for dirname in target_parts:
past_parent_id = current_parent_id
current_path = current_path + "/" + dirname
service_args = dict(parent_id=current_parent_id, per_page=1000, is_dir=True)
resp = self.session.post(service_url, data=dict(data=json.dumps(dict(params=service_args))))
if resp.status_code != 200:
errout("Could not find location '%s 'in AgXchange" % current_path)
resp_json = resp.json()
if "result" not in resp_json:
errout("Could not obtain service result")
file_info = resp_json["result"]
for fi in file_info["items"]:
if fi["text"] == dirname:
if fi["is_dir"] is not True:
errout("Target location '%s' is not a folder" % dirname)
current_parent_id = fi["_id"]
if current_parent_id == past_parent_id:
errout("Target location '%s' not found" % current_path)
return current_parent_id
def escape_filename(self, name):
VALID_CHARS = "-_.() "
return str(''.join(c for c in name if c.isalpha() or c.isdigit() or c in VALID_CHARS)).strip()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment