Skip to content

Instantly share code, notes, and snippets.

@tokejepsen
Last active December 2, 2019 11:21
Show Gist options
  • Save tokejepsen/397171800fa0adccec980ddb564cbc22 to your computer and use it in GitHub Desktop.
Save tokejepsen/397171800fa0adccec980ddb564cbc22 to your computer and use it in GitHub Desktop.
Ftrack: Check Resolution Action
import subprocess
import logging
import sys
import argparse
import webbrowser
import traceback
import json
import ftrack_api
class CheckResolutionAction(object):
'''Custom Action base class
`label` a descriptive string identifing your action.
`varaint` To group actions together, give them the same
label and specify a unique variant per action.
`identifier` a unique identifier for your action.
`description` a verbose descriptive text for you action
'''
label = "Check Resolution"
variant = None
identifier = "check-resolution"
description = None
def __init__(self, session):
'''Expects a ftrack_api.Session instance'''
self.session = session
self.logger = logging.getLogger(
'{0}.{1}'.format(__name__, self.__class__.__name__)
)
if self.label is None:
raise ValueError(
'Action missing label.'
)
elif self.identifier is None:
raise ValueError(
'Action missing identifier.'
)
self._session = session
def get_resolution(self, session, assetversions):
"""Gets the resolution of the review component, with ffmpeg."""
data = []
location = session.pick_location()
for assetversion in assetversions:
review_component = None
for component in assetversion["components"]:
if component["name"] == "ftrackreview-mp4_src":
review_component = component
filepath = location.get_filesystem_path(review_component)
args = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=s=x:p=0",
filepath
]
output = subprocess.check_output(args).rsplit()[0].decode("utf-8")
data.append(
{
"width": int(output.split("x")[0]),
"height": int(output.split("x")[1]),
"entity": assetversion
}
)
return data
def check_resolution(self, session, assetversions, expected_resolution):
"""Checks the resolution of the review against an expect dimension.
assetversions: list of assetversions
expected_resolution: {
"width": 1920,
"height": 1080
}
"""
data = self.get_resolution(session, assetversions)
for item in data:
item["expected_resolution"] = True
if item["width"] < expected_resolution["width"]:
item["expected_resolution"] = False
if item["height"] < expected_resolution["height"]:
item["expected_resolution"] = False
return data
def register(self):
'''Registers the action, subscribing the the discover and launch
topics.'''
self._session.event_hub.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
self.session.api_user
),
self._discover
)
self._session.event_hub.subscribe(
'topic=ftrack.action.launch and data.actionIdentifier={0}'.format(
self.identifier
),
self._launch
)
def _discover(self, event):
args = self._translate_event(
self._session, event
)
accepts = self.discover(
self._session, *args
)
if accepts:
return {
'items': [{
'label': self.label,
'variant': self.variant,
'description': self.description,
'actionIdentifier': self.identifier,
}]
}
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the
entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
allow_discovery = True
for entity in entities:
if entity[0] != "AssetVersion":
allow_discovery = False
return allow_discovery
def _translate_event(self, session, event):
'''Return *event* translated structure to be used with the API.'''
_selection = event['data'].get('selection', [])
_entities = list()
for entity in _selection:
_entities.append(
(
self._get_entity_type(entity), entity.get('entityId')
)
)
return [
_entities,
event
]
def _get_entity_type(self, entity):
'''Return translated entity type tht can be used with API.'''
entity_type = entity.get('entityType')
object_typeid = None
for schema in self._session.schemas:
alias_for = schema.get('alias_for')
if (
alias_for and isinstance(alias_for, dict) and
alias_for['id'].lower() == entity_type and
object_typeid == alias_for.get(
'classifiers', {}
).get('object_typeid')
):
return schema['id']
for schema in self._session.schemas:
alias_for = schema.get('alias_for')
check_string = None
if sys.version_info[0] < 3:
check_string = basestring
else:
check_string = str
if (
alias_for and isinstance(alias_for, check_string) and
alias_for.lower() == entity_type
):
return schema['id']
for schema in self._session.schemas:
if schema['id'].lower() == entity_type:
return schema['id']
raise ValueError(
'Unable to translate entity type.'
)
def _launch(self, event):
args = self._translate_event(
self._session, event
)
response = self.launch(
self._session, *args
)
return self._handle_result(
self._session, response, *args
)
def _translate_entities(self, session, entities):
"""Translating string based entity data to Python objects.
*entities* is a list of tuples each containing the entity type and the
entity id.
"""
result = []
for entity in entities:
result.append(session.get(entity[0], entity[1]))
return result
def check_entities(self, session, entities, event):
width = int(event["data"]["values"]["expected_width"])
height = int(event["data"]["values"]["expected_height"])
data = self.check_resolution(
session,
self._translate_entities(session, entities),
{"width": width, "height": height}
)
items = []
self.failed_data = []
for item in data:
if item["expected_resolution"]:
continue
self.failed_data.append(item)
items.append(
{
'label': "{} / {} v{}".format(
item["entity"]["asset"]["parent"]["name"],
item["entity"]["asset"]["name"],
item["entity"]["version"]
),
'type': 'text',
'value': "{}x{}".format(
item["width"], item["height"]
),
'name': item["entity"]["id"]
}
)
if not items:
return {
"success": True, "message": "All entities checked."
}
items.insert(0, {'value': "Failed entities:", 'type': 'label'})
items.append(
{
'value': (
"Pressing \"Submit\" will open all"
" failed entities."
),
'type': 'label'
}
)
items.append(
{
'value': '',
'name': 'failed_check',
'type': 'hidden'
}
)
return {"success": True, "message": "", "items": items}
def launch(self, session, entities, event):
'''Callback method for the custom action.
return either a bool ( True if successful or False if the action
failed ) or a dictionary with they keys `message` and `success`, the
message should be a string and will be displayed as feedback to the
user, success should be a bool, True if successful or False if the
action failed.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the
entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
if 'values' in event['data']:
values = event['data']['values']
if 'expected_width' in values:
# Create job and for progress feedback.
user = session.query(
"User where username is \"{0}\"".format(
event["source"]["user"]["username"]
)
).one()
job = session.create(
"Job",
{
"user": user,
"status": "running",
"data": json.dumps(
{"description": "Check Resolution: Scanning..."}
)
}
)
session.commit()
result = {}
try:
result = self.check_entities(session, entities, event)
except Exception:
print(traceback.format_exc())
job["status"] = "failed"
else:
job["status"] = "done"
session.commit()
return result
if 'failed_check' in values:
for item in self.failed_data:
webbrowser.open(
session.get_widget_url(
"info", entity=item["entity"]["task"]
)
)
return {"success": True, "message": "All entities opened."}
return {
"success": True,
"message": "",
"items": [
{
"label": "Expected width",
"type": "number",
"name": "expected_width",
"value": 1920
},
{
"label": "Expected height",
"type": "number",
"name": "expected_height",
"value": 1080
}
]
}
def _handle_result(self, session, result, entities, event):
'''Validate the returned result from the action callback'''
if isinstance(result, bool):
result = {
'success': result,
'message': (
'{0} launched successfully.'.format(
self.label
)
)
}
elif isinstance(result, dict):
for key in ('success', 'message'):
if key in result:
continue
raise KeyError(
'Missing required key: {0}.'.format(key)
)
else:
self.logger.error(
'Invalid result type must be bool or dictionary!'
)
session.commit()
return result
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CheckResolutionAction(session).register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment