Skip to content

Instantly share code, notes, and snippets.

@maanenson
Last active September 17, 2021 20:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maanenson/c662a40aa8a5358a422837e89e603847 to your computer and use it in GitHub Desktop.
Save maanenson/c662a40aa8a5358a422837e89e603847 to your computer and use it in GitHub Desktop.
Example Python script to demonstrate using ExoSense public API to list and download asset content files
#
# Example script to use the ExoSense GraphQL API to read an Assets information,
# list the content available for the Asset, and download each content file.
# Notes:
# - Asset Content files uploaded by the API or Users (the application) is
# queried differently than content files uploaded by Devices (associated to
# Asset)
# - Very little error handling or unexpected response codes are handled in this example
#
import requests
import json
# API Token
API_TOKEN = "<TOKEN_HERE>" # API TOKEN HERE
EXOSENSE_DOMAIN = "<EXOSENSE_URL_HERE>" #example https://example.apps.exosite.io
ASSET_ID = "<ASSET_ID_HERE>"
url = f"{EXOSENSE_DOMAIN}/api/graphql"
headers={'Authorization': f'Automation {API_TOKEN}'}
#
# Get A specific Asset's ID, meta, description, and a list of signal channels to find what devices are associated
#
query = """query asset($assetId: ID!) {
asset(id: $assetId) {
id
name
meta
description
signals {
# id
# name
channel {
id
#properties
}
}
}
}"""
params = {"assetId": ASSET_ID}
r = requests.post(url, headers=headers, json={'query': query, 'variables': params})
#print(r.status_code)
if r.status_code == 200:
json_data = json.loads(r.text)
#print(json.dumps(json_data,indent=2))
asset = json_data['data']['asset']
print(f"Asset Name: {asset['name']}\nAsset ID: {asset['id']}\nMeta: {json.dumps(asset['meta'],indent=2)}")
asset_devices =[]
for signal in asset['signals']:
channel_source = signal['channel']['id'].split(".")
unique_device = {"product_id":channel_source[0], "device_id":channel_source[1]}
if unique_device not in asset_devices:
asset_devices.append(unique_device)
print("Asset Device Sources (ProductID.DeviceID):")
for device in asset_devices:
print(f"-> {device}")
asset_content = []
#
# GET ASSET CONTENT THAT HAS BEEN UPLOADED BY API OR A USER
#
query = """query assetContents($filters: ContentFilters, $pagination: Pagination, $id: [ID!], $devices: [AssetContentsDeviceInput]) {
assetContents(filters: $filters, pagination: $pagination, id: $id, devices: $devices) {
assetContents {
id
name
associations { type id }
tags { name value }
contentType
createdBy {
... on Device {
pid
identity
}
... on User {
name
}
}
type
lastModified
length
}
totalCount
}
}"""
params = {
"filters": {
"contentTypes": [],
"orderBy": "updated_at",
"sort": "desc",
"association": {
"type": "asset",
"id": ASSET_ID
}
},
"pagination": {
"limit": 50,
"offset": 0
},
"devices": None
}
r = requests.post(url, headers=headers, json={'query': query, 'variables': params})
#print(r.status_code)
if r.status_code == 200:
json_data = json.loads(r.text)
#print(json.dumps(json_data,indent=2))
#asset_content.append(json_data['data']['assetContents']['assetContents'])
#print(asset_content)
print("Asset Content (API or User Uploaded):__")
for content in json_data['data']['assetContents']['assetContents']:
asset_content.append(content)
print(f"======\nFile Name: {content['name']}\nFile Type: {content['type']}\nCreated By: {json.dumps(content['createdBy'],indent=2)}")
else:
print(r.status_code,r.reason)
##
## Get Content that a Device has uploaded
##
asset_device_content = []
params = {
"filters": {
"contentTypes": [],
"orderBy": "updated_at",
"sort": "desc",
"association": {
"type": "asset",
"id": ASSET_ID
}
},
"pagination": {
"limit": 50,
"offset": 0
},
"id": None,
"devices": asset_devices
}
r = requests.post(url, headers=headers, json={'query': query, 'variables': params})
#print(r.status_code)
if r.status_code == 200:
json_data = json.loads(r.text)
#print(json.dumps(json_data,indent=2))
#asset_content.append(json_data['data']['assetContents']['assetContents'])
#print(asset_content)
print("Asset Content (Device Uploaded):__")
for content in json_data['data']['assetContents']['assetContents']:
asset_device_content.append(content)
print(f"======\nFile Name: {content['name']}\nFile Type: {content['type']}\nCreated By: {json.dumps(content['createdBy'],indent=2)}")
else:
print(r.status_code,r.reason)
##
## Get URL Link to Download A Content File that a User/API has uploaded
##
for content in asset_content:
query = """query ContentDownload($id: ID!) {
content(id: $id) {
id
url
name
lastModified
}
}"""
params = {
"id": content['id']
}
r = requests.post(url, headers=headers, json={'query': query, 'variables': params})
#print(r.status_code)
if r.status_code == 200:
json_data = json.loads(r.text)
#print(json.dumps(json_data,indent=2))
content_url = json_data['data']['content']['url']
print(f"======\nFile: {content['name']}\n{content['type']}\nURL: {content_url}\n=====")
## DOWNALOAD FILE HERE
print(f"DOWNLOADING {content['name']} START")
resp = requests.get(content_url)
fd = open(content['name'], 'wb')
fd.write(resp.content)
fd.close()
print(f"DOWNLOADING {content['name']} COMPLETED")
##
## Get URL Link to Download A Content File that a Device has uploaded
##
for content in asset_device_content:
query = """query DeviceAssetContent($input: DeviceAssetContentInput!) {
deviceAssetContent(input: $input) {
url
}
}"""
params = {
"input": {
"id": content['id'],
"product_id": content['createdBy']['pid'],
"device_id": content['createdBy']['identity']
}
}
r = requests.post(url, headers=headers, json={'query': query, 'variables': params})
#print(r.status_code)
if r.status_code == 200:
json_data = json.loads(r.text)
#print(json.dumps(json_data,indent=2))
content_url = json_data['data']['deviceAssetContent']['url']
print(f"======\nFile: {content['name']}\n{content['type']}\nURL: {content_url}\n======")
## DOWNALOAD FILE HERE
print(f"DOWNLOADING {content['name']} START")
resp = requests.get(content_url)
device_content_file_name = content['name'].replace('/','_')
fd = open(device_content_file_name, 'wb')
fd.write(resp.content)
fd.close()
print(f"DOWNLOADING {content['name']} COMPLETED")
else:
print(r.status_code,r.reason)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment