Skip to content

Instantly share code, notes, and snippets.

@daigles
Last active May 16, 2023 06:20
Show Gist options
  • Save daigles/ff958b8b3ed695329d371e5d500acb0a to your computer and use it in GitHub Desktop.
Save daigles/ff958b8b3ed695329d371e5d500acb0a to your computer and use it in GitHub Desktop.
import datetime
import io
import os
import requests
import shutil
import time
class ShotgunMediaIO:
# Note: chunk_size must be greater than 5,000,000
# This is the byte size at which file uploads will switch over to mulitpart uploads.
chunk_size = 10000000
@staticmethod
def is_transient(url):
if not url:
return False
return "/images/status/transient/" in url
# Creates a MediaIO object.
# @param host [String] base URL of the Shotgun site
# @param credentials [Hash] credentials to use to access the site's API endpoint
# @option credentials [String] 'login' User login. To be used with 'password'
# @option credentials [String] 'password' user password
# @option credentials [String] 'client_id' api script name. To be used with 'client_secret'
# @option credentials [String] 'client_secret' api script secret
def __init__(self, host, credentials, debug = False):
self.host = host
self.credentials = credentials
self.access_token = None
self.refresh_token = None
self.access_expires_at = None
self.refresh_expires_at = None
self.debug = debug
# Upload a file to the Shotgun site and creates an Attachment entity associated to the given entity.
# @param entity_type [String] type of the entity the media will be attached to
# @param record_id [number] id of the entity the media will be attached to
# @param field [String] field of the entity the media will be attached to. This can be a URL field or None. If None,
# the media is uploaded and the File (Attachment) entity is linked to the given entity.
# @param local_file_path [String] The path to the file to upload.
# @param content_type [String] MIME type of the file to upload. This will be associated to the uploaded file.
def upload(self, entity_type, record_id, field, local_file_path, content_type):
filename = os.path.basename(local_file_path)
fd = open(local_file_path, "rb")
fsize = os.path.getsize(local_file_path)
success = False
if fsize <= self.chunk_size:
success = self.__upload(entity_type, record_id, field, filename, fd, content_type)
else:
success = self.__upload_multipart(entity_type, record_id, field, filename, fd, content_type)
fd.close()
return success
# Gets a signed URL that can be used to download a media file.
# @param entity_type [String] type of the entity the media is attached to
# @param record_id [number] id of the entity the media is attached to
# @param field [String] field of the entity the media is attached to
def get_download_url(self, entity_type, record_id, field="image"):
headers = self.__setup_auth_headers()
headers["Accept"] = "application/json"
url = "%s/api/v1/entity/%s/%s/%s" % (self.host, entity_type, record_id, field)
result = None
try:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
result = resp.json()["data"]
if isinstance(result, dict):
result = result["url"]
except requests.exceptions.RequestException as e:
self.__debug("Could not get the download url: %s" % e)
if not result:
result = ""
return result
# Download a media file.
# @param entity_type [String] type of the entity the media is attached to
# @param record_id [number] id of the entity the media is attached to
# @param field [String] field of the entity the media is attached to
# @param local_file_path [String] local file path that will contain the media
def download(self, entity_type, record_id, field, local_file_path):
url = self.get_download_url(entity_type, record_id, field)
if not url:
return None
result = False
try:
with requests.get(url, stream=True) as resp:
resp.raise_for_status()
with open(local_file_path, "wb") as fd:
shutil.copyfileobj(resp.raw, fd)
result = True
except requests.exceptions.RequestException as e:
self.__debug("Could not download: %s" % e)
return result
# Downloads the thumbnail associated to the given entity.
# @param entity_type [String] type of the entity the media is attached to
# @param record_id [number] id of the entity the media is attached to
# @param size [String] 'original' or 'thumbnail'.
# @param local_file_path [String] local path where to download the file
def download_thumbnail(self, entity_type, record_id, size, local_file_path):
headers = self.__setup_auth_headers()
headers["Accept"] = "application/json"
url = "%s/api/v1/entity/%s/%s/image?alt=%s"%(self.host, entity_type, record_id, size)
result = False
try:
resp = requests.get(url, allow_redirects=True, headers=headers)
open(local_file_path, "wb").write(resp.content)
result = True
except requests.exceptions.RequestException as e:
self.__debug("Could not download thumbnail: %s" % e)
return result
# Private
def __upload(self, entity_type, record_id, field, filename, fd, content_type):
# Get the upload info from SG (including the target URL)
upload_metadata = self.__get_upload_metadata(filename, entity_type, record_id, field)
if not upload_metadata:
return False
# Upload the data to the received target URL
upload_link = upload_metadata["links"]["upload"]
upload_response = self.__upload_data(upload_link, content_type, fd)
if not upload_response:
return False
# Complete the upload process
result = self.__complete_upload(upload_response, upload_metadata, filename)
if not result:
self.__debug("Could not complete the upload... aborting")
return False
return True
def __upload_multipart(self, entity_type, record_id, field, filename, fd, content_type):
# Get the upload info from SG (including the target URL for the first part to upload)
upload_metadata = self.__get_upload_metadata(filename, entity_type, record_id, field, multipart_upload=True)
if not upload_metadata:
self.__debug("Multipart uploads not supported for this storage type... (this is only supported for uploads to S3)")
return False
# Upload the data to the received target URL
upload_link = upload_metadata["links"]["upload"]
next_part_link = upload_metadata["links"]["get_next_part"]
etags = []
while True:
upload_response = self.__upload_data(upload_link, content_type, fd, self.chunk_size)
if not upload_response:
break
etags.append(upload_response["e_tag"])
if upload_response["nb_bytes_uploaded"] < self.chunk_size:
break
upload_part_metadata = self.__get_next_upload_part_metadata(next_part_link)
upload_link = upload_part_metadata["links"]["upload"]
next_part_link = upload_part_metadata["links"]["get_next_part"]
if not upload_response:
self.__debug("Could not upload data... aborting")
self.__abort_multipart_upload(upload_metadata)
return False
# Complete the upload process
upload_metadata["data"]["etags"] = etags
result = self.__complete_upload(upload_response, upload_metadata, filename)
if not result:
self.__debug("Could not complete the upload... aborting")
self.__abort_multipart_upload(upload_metadata)
return False
return True
def __get_upload_metadata(self, filename, entity_type, record_id, field, multipart_upload=False):
headers = self.__setup_auth_headers()
headers["Accept"] = "application/json"
params = {"filename": filename}
if multipart_upload:
params["multipart_upload"] = True
if field:
url = "%s/api/v1/entity/%s/%s/%s/_upload"% (self.host, entity_type, record_id, field)
else:
url = "%s/api/v1/entity/%s/%s/_upload"% (self.host, entity_type, record_id)
result = None
try:
resp = requests.get(url, params=params, headers=headers)
resp.raise_for_status()
result = resp.json()
except requests.exceptions.RequestException as e:
self.__debug("Could not get upload metadata. Aborting...: %s" % e)
return result
def __upload_data(self, upload_link, content_type, fd, max_bytes=None):
headers = {"Content-Type": content_type}
fd_temp = fd
if max_bytes:
fd_temp = io.BytesIO()
fd_temp.write(fd.read(max_bytes))
fd_temp.seek(0)
result = None
for i in range(5):
self.__debug("Uploading to: %s (nb_tries: %s)" % (upload_link, i+1))
try:
resp = requests.put(upload_link, data=fd_temp, headers=headers)
resp.raise_for_status()
result = resp.json() if resp.text else {"data": {}}
result["nb_bytes_uploaded"] = int(resp.request.headers["Content-Length"])
result["e_tag"] = resp.headers["ETag"]
break
except requests.exceptions.RequestException as e:
self.__debug("upload_data: Unexpected result: %s" % e)
fd_temp.seek(0)
print "Could not upload data. Retrying in 3 seconds..."
time.sleep(3)
return result
def __get_next_upload_part_metadata(self, get_next_part_url):
headers = self.__setup_auth_headers()
headers["Accept"] = "application/json"
url = self.host + get_next_part_url
result = None
try:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
result = resp.json()
except requests.exceptions.RequestException as e:
self.__debug("get_next_upload_part_metadata: Unexpected result: %s" % e)
return result
def __complete_upload(self, upload_response, upload_metadata, filename):
complete_upload_info = upload_metadata["data"]
if upload_metadata["data"]["storage_service"] == "sg":
complete_upload_info["upload_id"] = upload_response["data"]["upload_id"]
complete_upload_data = {
"upload_info": complete_upload_info,
"upload_data": {
"display_name": filename
}
}
complete_upload_link = self.host + upload_metadata["links"]["complete_upload"]
headers = self.__setup_auth_headers()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
result = False
try:
resp = requests.post(complete_upload_link, json=complete_upload_data, headers=headers)
resp.raise_for_status()
result = True
except requests.exceptions.RequestException as e:
self.__debug("Could not complete the upload: %s" % e)
return result
def __abort_multipart_upload(self, upload_metadata):
upload_info = upload_metadata["data"]
headers = self.__setup_auth_headers()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
url = "%s%s/multipart_abort" % (self.host, upload_metadata["links"]["complete_upload"])
result = False
try:
resp = requests.post(url, headers=headers)
resp.raise_for_status()
result = True
except requests.exceptions.RequestException as e:
self.__debug("Could not abort the upload: %s" % e)
return result
def __setup_auth_headers(self):
if not self.access_expires_at:
self.__get_access_token()
elif self.access_expires_at <= datetime.datetime.now():
if self.__refresh_expires_at <= datetime.datetime.now():
self.__get_access_token()
else:
self.__get_access_token(grant_type='refresh')
return { 'Authorization': self.access_token }
def __get_access_token(self, grant_type = None):
self.access_token = None
self.refresh_token = None
self.access_expires_at = None
self.refresh_expires_at = None
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
params = None
if grant_type == 'refresh':
params = {
"grant_type": "refresh",
"refresh_token": self.refresh_token
}
if ("login" in self.credentials) and ("password" in self.credentials):
params={
"username": self.credentials["login"],
"password": self.credentials["password"],
"grant_type": "password"
}
elif ("client_id" in self.credentials) and ("client_secret" in self.credentials):
params={
"client_id": self.credentials["client_id"],
"client_secret": self.credentials["client_secret"],
"grant_type": "client_credentials"
}
else:
self.__debug("credentials format not supported")
return None
result = None
try:
resp = requests.post("%s/api/v1/auth/access_token" % self.host, params=params, headers=headers)
resp.raise_for_status()
result = resp.json()
self.access_token = "%s %s"%(result["token_type"], result["access_token"])
self.refresh_token = result['refresh_token']
self.access_expires_at = datetime.datetime.now() + datetime.timedelta(0,result['expires_in'])
self.refresh_expires_at = datetime.datetime.now() + datetime.timedelta(0,24*60*60)
except requests.exceptions.RequestException as e:
self.__debug("Cannot get access token. Unexpected result: %s" % e)
return result
def __debug(self, *args):
if not self.debug:
return
print(" DEBUG: " + " ".join(map(str,args)))
if __name__ == '__main__':
#########################
host = "https://yoursite.shotgunstudio.com"
login = "XXXXX"
password = "XXXXX"
sg_media_io = ShotgunMediaIO(host, {"login":login, "password": password}, debug = True)
###########################
target_entity = "Version"
target_record_id = 712
print "=============================================="
print "URL field workflow"
print "---------------------------------------"
print "Try to get the download URL of an empty link"
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "sg_uploaded_movie")
print "download url: %s" % download_url
print "---------------------------------------"
print "Try to download an empty link"
result = sg_media_io.download(target_entity, target_record_id, "sg_uploaded_movie", "./downloaded_Sourissimo.mp4" )
print "result: %s" % result
print "---------------------------------------"
print "Upload data"
result = sg_media_io.upload(target_entity, target_record_id, "sg_uploaded_movie", "./Sourissimo.mp4", "video/mp4" )
print "result: %s" % result
print "---------------------------------------"
print "Download url for uploaded media"
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "sg_uploaded_movie")
print("download url: %s" % download_url)
print "---------------------------------------"
print "Download media"
result = sg_media_io.download(target_entity, target_record_id, "sg_uploaded_movie", "./downloaded_Sourissimo.mp4" )
print "result: %s" % result
print "---------------------------------------"
print "=============================================="
print "Linked media workflow"
print "No field name: The file will be uploaded as a File (Attachment) entity linked to the given entity"
result = sg_media_io.upload(target_entity, target_record_id, None, "./sourissimo.jpeg", "image/jpeg")
print "result: %s" % result
print "---------------------------------------"
print "=============================================="
print "Thumbnail workflow"
print "Try download (small) thumbnail associated to an entity"
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "thumbnail", "./downloaded_small_Sourissimo.jpg" )
print "result: %s" % result
print "Try download (original) thumbnail associated to an entity"
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "original", "./downloaded_large_Sourissimo.jpg" )
print "result: %s" % result
print "---------------------------------------"
print "Try to get the download url for the small thumbnail for the entity"
result = download_url = sg_media_io.get_download_url(target_entity, target_record_id)
print("download url: #{download_url} (transient? #{Shotgun::MediaIO.is_transient? download_url})")
print "---------------------------------------"
print "Uploading to an image field: This will associate the thumbnail to the given entity"
# Note: Once the upload is completed, a process to generate a "small thumbnail" will automatically be started by Shotgun.
# That small thumbnail should be available to use within a few minutes...
result = sg_media_io.upload(target_entity, target_record_id, "image", "./sourissimo.jpeg", "image/jpeg" )
print "result: %s" % result
print "---------------------------------------"
print "Download url for the small thumbnail for the entity"
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "image")
print( "download url: %s (transient? %s)"%(download_url, sg_media_io.is_transient(download_url)))
print "---------------------------------------"
print "Download url for the small thumbnail for the entity"
download_url = sg_media_io.get_download_url(target_entity, target_record_id)
print( "download url: %s (transient? %s)"%(download_url, sg_media_io.is_transient(download_url)))
print "---------------------------------------"
print "download (small) thumbnail uploaded for that entity"
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "thumbnail", "./downloaded_small_Sourissimo.jpg" )
print "result: %s" % result
print "download (original) thumbnail uploaded for that entity"
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "original", "./downloaded_large_Sourissimo.jpg" )
print "result: %s" % result
print "---------------------------------------"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment