Skip to content

Instantly share code, notes, and snippets.

@thomastraum
Created July 1, 2024 15:50
Show Gist options
  • Save thomastraum/c18a42482b25714fcb8f5e33cc3bfbad to your computer and use it in GitHub Desktop.
Save thomastraum/c18a42482b25714fcb8f5e33cc3bfbad to your computer and use it in GitHub Desktop.
import ftrack_api
import os
import requests
import logging
import mimetypes
class DownloadAnyVersion(object):
label = "Download MP4 and Image Reviews"
identifier = "download.mp4.and.image.reviews"
def __init__(self, session):
self.session = session
self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__)
self.logger.setLevel(logging.DEBUG)
def register(self):
self.session.event_hub.subscribe(
"topic=ftrack.action.discover and source.user.username={0}".format(
self.session.api_user
),
self.discover,
)
self.session.event_hub.subscribe(
"topic=ftrack.action.launch and data.actionIdentifier={0}".format(
self.identifier
),
self.launch,
)
def discover(self, event):
return {
"items": [
{
"label": self.label,
"actionIdentifier": self.identifier,
}
]
}
def launch(self, event):
self.logger.info("Action launched")
selection = event["data"].get("selection", [])
if not selection:
self.logger.warning("No items selected")
return {"success": False, "message": "No items selected."}
download_location = os.path.expanduser("~/Downloads/ftrack_downloads")
self.logger.info(f"Download location: {download_location}")
os.makedirs(download_location, exist_ok=True)
server_location = self.session.query(
'Location where name is "ftrack.server"'
).one()
downloaded_files = []
for entity in selection:
self.logger.info(
f"Processing entity: {entity['entityType']} (ID: {entity['entityId']})"
)
asset_version = self.session.get("AssetVersion", entity["entityId"])
self.logger.info(f"Asset Version ID: {asset_version['id']}")
self.logger.info(f"Asset Name: {asset_version['asset']['name']}")
self.logger.info(f"Version Number: {asset_version['version']}")
for component in asset_version["components"]:
if (
"ftrackreview-mp4-1080" in component["name"]
or "ftrackreview-image" in component["name"]
):
self.logger.info(f"Processing component: {component['name']}")
self.logger.info(f"Component ID: {component['id']}")
try:
download_url = server_location.get_url(component)
self.logger.info(f"Download URL: {download_url}")
if download_url:
# Determine file extension based on component name and MIME type
if "mp4" in component["name"].lower():
extension = ".mp4"
elif "image" in component["name"].lower():
# Get the MIME type from the download URL
mime_type, _ = mimetypes.guess_type(download_url)
if mime_type:
extension = (
mimetypes.guess_extension(mime_type) or ".png"
)
else:
extension = ".png" # Generic image extension if MIME type is unknown
else:
extension = ""
file_name = f"{asset_version['asset']['name']}_{asset_version['version']}_{component['name']}{extension}"
destination = os.path.join(download_location, file_name)
self.logger.info(f"Destination: {destination}")
response = requests.get(download_url)
if response.status_code == 200:
with open(destination, "wb") as f:
f.write(response.content)
downloaded_files.append(destination)
self.logger.info(
f"Successfully downloaded: {destination}"
)
else:
self.logger.error(
f"Failed to download {download_url}. Status code: {response.status_code}"
)
else:
self.logger.warning(
f"No download URL found for component: {component['name']}"
)
except Exception as e:
self.logger.error(
f"Error downloading component {component['name']}: {str(e)}"
)
else:
self.logger.info(f"Skipping component: {component['name']}")
if downloaded_files:
message = f"Downloaded {len(downloaded_files)} files to {download_location}"
self.logger.info(message)
return {"success": True, "message": message}
else:
message = "No MP4 or image review files were found or downloaded. Check logs for details."
self.logger.warning(message)
return {"success": False, "message": message}
def register(session):
action = DownloadAnyVersion(session)
action.register()
if __name__ == "__main__":
session = ftrack_api.Session()
register(session)
session.event_hub.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment