Last active
May 5, 2022 04:02
-
-
Save danielskovli/cfec8aae6c0e1ab7e418e5a222a489fb to your computer and use it in GitHub Desktop.
Simple demo of a thread-safe implementation of Shotgun Python API via pool storage and a pool (context) manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
''' | |
Shotgun thread-safe API call demo for Python 3.x | |
Requires Shotgun Python API: https://github.com/shotgunsoftware/python-api | |
Context manager implementation based on concept from ArjanCodes: https://www.youtube.com/watch?v=Rm4JP7JfsKY | |
All code provided as a single file for ease of testing/demo. Should most definitely be split into | |
relevant modules if implemented in production. | |
`ShotgunConfig` must be completed as applicable for your given Shotgun setup. | |
''' | |
from __future__ import annotations | |
from typing import Any, Callable | |
import time | |
import threading | |
import shotgun_api3 | |
class ShotgunConfig: | |
'''ShotGrid credentials and config''' | |
host = '<Shotgun site url>' | |
clientId = '<script API name>' | |
clientSecret = '<script API secret>' | |
demoProjectId = 0 | |
class Entity: | |
'''Entity types''' | |
project = 'Project' | |
shot = 'Shot' | |
class Fields: | |
'''Default field requests''' | |
_default = ['id', 'code', 'sg_status', 'sg_status_list', 'description'] | |
shot = _default + ['sg_camera_lens', 'sg_camera_framing', 'sg_camera_description', 'sg_sequence'] | |
class NoAvailableInstancesError(Exception): | |
'''Pool manager does not have any available instances to provide''' | |
class InstancePoolManager: | |
'''Manager for `InstancePool`''' | |
def __init__(self, pool: InstancePool): | |
'''Initialize a new InstancePoolManager. This object will handle enter/exit hooks during a `with` clause | |
Args: | |
pool (InstancePool): The InstancePool to interact with | |
''' | |
self.pool = pool | |
self.obj = None | |
def __enter__(self): | |
'''User-code has entered `with` clause, acquire Shotgun instance''' | |
self.instance = self.pool.acquire() | |
print(f'Manager: Allocated Shotgun instance with ID {id(self.instance)} (session token {self.instance.config.session_token})') | |
return self.instance | |
def __exit__(self, *args): | |
'''User-code has exited `with` clause, release Shotgun instance''' | |
self.pool.release(self.instance) | |
class InstancePool: | |
'''Instance pool that keeps track of `Shotgun` instances''' | |
def __init__(self, host: str, scriptName: str, apiKey: str, size: int=-1): | |
'''Initialize a new InstancePool | |
Args: | |
host (str): Base URL to Shotgun site. Eg. https://your-site.shotgunstudio.com | |
scriptName (str): API key name | |
apiKey (str): API key secret | |
size (int, optional): Max pool size. Defaults to -1, which means unlimited | |
''' | |
self.host = host | |
self.scriptName = scriptName | |
self.apiKey = apiKey | |
self.size = size | |
self.free: list[shotgun_api3.Shotgun] = [] | |
self.inUse: list[shotgun_api3.Shotgun] = [] | |
@property | |
def currentSize(self) -> int: | |
return len(self.free) + len(self.inUse) | |
def acquire(self) -> shotgun_api3.Shotgun: | |
'''Acquire an instance from the pool. Recycle if possible, create new if required (within `self.size` limits)''' | |
numFree = len(self.free) | |
numUsed = len(self.inUse) | |
if self.size > -1 and numFree == 0 and numUsed >= self.size: | |
raise NoAvailableInstancesError(f'No further instances can be allocated, as defined by user-defined maximum pool size: {self.size}') | |
instance: shotgun_api3.Shotgun | |
if numFree: | |
print('Acquire: Returning existing free instance') | |
instance = self.free.pop(0) | |
else: | |
print('Acquire: Genereting new instance') | |
instance = self.instanceFactory() | |
self.inUse.append(instance) | |
return instance | |
def release(self, r: shotgun_api3.Shotgun): | |
'''Release an instance -> move it from `inUse` to `free`''' | |
self.inUse.remove(r) | |
self.free.append(r) | |
def instanceFactory(self) -> shotgun_api3.Shotgun: | |
'''Generate a new, or clone existing shotgun connection as applicable''' | |
existingInstance: shotgun_api3.Shotgun|None = None | |
# Realistically this never happens if called from `self.acquire` | |
if self.free and self.free[0].config.session_token: | |
existingInstance = self.free[0] | |
# This is more likely to happen, since the reason we're generating an instance is because all existing ones are busy | |
elif self.inUse and self.inUse[0].config.session_token: | |
existingInstance = self.inUse[0] | |
# We have an instance, clone it | |
if existingInstance: | |
print(f'Factory: Using existing instance session token: {existingInstance.config.session_token}') | |
instance = shotgun_api3.Shotgun( | |
base_url = self.host, | |
connect = False, | |
session_token = existingInstance.config.session_token | |
# session_token = existingInstance.get_session_token() | |
) | |
instance._connection = None | |
return instance | |
# Need to generate new instance, which will require authentication | |
else: | |
print('Factory: Generating new instance with auth creds') | |
instance = shotgun_api3.Shotgun( | |
base_url = self.host, | |
script_name = self.scriptName, | |
api_key = self.apiKey | |
) | |
instance.config.session_token = instance.get_session_token() # Force auth, store session token | |
return instance | |
class SingleThread(threading.Thread): | |
def __init__(self, target: Callable, callbacks: Callable|list[Callable]=None) -> None: | |
'''Generic thread for API calls, taking a target method and (optional) callback method reference. | |
To start the execution of this thread, `.start()` must explicitly be called. See `threading.Thread` docs for further details. | |
Args: | |
target (Callable): Target method to execute | |
callbacks (Callable|list[Callable], optional): Callback method(s) to execute after thread is finished. Must accept `targetReference, *args, **kwargs` payload -> the latter two being the result from `target()` | |
''' | |
self.target = target | |
self.callbacks = callbacks | |
self._die: bool = False | |
super().__init__(target=self.wrapper) | |
def kill(self): | |
'''This does not exactly kill the thread, but will prevent them from executing callbacks''' | |
self._die = True | |
def wrapper(self) -> None: | |
result = self.target() | |
if self._die: | |
return | |
if self.callbacks: | |
if not hasattr(self.callbacks, '__iter__'): | |
self.callbacks = [self.callbacks] | |
callback: Callable | |
for callback in self.callbacks: # type: ignore | |
if self._die: | |
return | |
callback(self.target, result) | |
class ShotgunClient: | |
'''Shotgun API Wrapper''' | |
def __init__(self, poolSize: int=-1) -> None: | |
'''Shotgun API wrapper. | |
Most methods will block while waiting for http, so best called on a separate thread. | |
To access the `shotgun_api3.Shotgun` instance directly at any stage, use the `InstancePoolManager` or in a pinch, the `.instance` getter | |
''' | |
super().__init__() | |
self.instancePool = InstancePool( | |
host=ShotgunConfig.host, | |
scriptName=ShotgunConfig.clientId, | |
apiKey=ShotgunConfig.clientSecret, | |
size=poolSize | |
) | |
@property | |
def instance(self) -> shotgun_api3.Shotgun: | |
'''Acquires a `Shotgun` instance from the instance pool directly. | |
This will work, and will be tracked, but will never be recycled unless done so manually by the caller | |
Eg. herein lies memory leaks... | |
A better way to access the Shotgun instance is to call the pool manager via `with InstancePoolManager(self.instancePool) as sg: ...` | |
''' | |
# This will be tracked in the pool, but unless the caller manually releases it, | |
# the instance will never be returned and recycled | |
return self.instancePool.acquire() | |
@classmethod | |
def generateEntityObject(cls, entityType: str, shotgunId: int) -> dict[str, Any]: | |
'''Helper: Generate and return a dict containing the correct query parameter for an entity type + id''' | |
return { | |
'type': entityType, | |
'id': shotgunId | |
} | |
@classmethod | |
def generateDefaultProjectFilter(cls, projectId: int|None) -> list[Any]|list[list[Any]]: | |
'''Helper: Generate and return a default filter for the given project id''' | |
if projectId is not None: | |
return [ | |
[ShotgunConfig.Entity.project.lower(), 'is', cls.generateEntityObject(ShotgunConfig.Entity.project, projectId)], | |
] | |
else: | |
return [] | |
def getShots(self, projectId=None, filters=None, fields=None) -> list[dict[str, Any]]: | |
'''Get all shots from a given project with the given filter/fields, or defaults''' | |
with InstancePoolManager(self.instancePool) as sg: | |
print(f'ShotgunClient: InstanceManager returned Shotgun instance with ID {id(sg)} (session token {sg.config.session_token})') | |
return sg.find( | |
entity_type = ShotgunConfig.Entity.shot, | |
filters = filters or self.generateDefaultProjectFilter(projectId), | |
fields = fields or ShotgunConfig.Fields.shot | |
) | |
def main(): | |
print('Initiating test') | |
sgClient = ShotgunClient() | |
# sgClient = ShotgunClient(poolSize=5) # Uncomment to trigger `NoAvailableInstancesError` | |
numQueries = 10 | |
time1 = time.time() | |
print(f'Pool size (initial): {sgClient.instancePool.currentSize}') | |
print(f'Executing {numQueries} sequental API calls:') | |
for i in range(numQueries): | |
print(f'Getting shots with loop index {i}...') | |
shots = sgClient.getShots(projectId=650) | |
print(f'Found {len(shots)} shots') | |
time2 = time.time() | |
print('Sequential benchmark: {:.3f} seconds'.format(time2-time1)) | |
print(f'Pool size (after sequential loop): {sgClient.instancePool.currentSize}') | |
print(f'Executing {numQueries} threaded API calls:') | |
_threads: list[SingleThread] = [] | |
for i in range(numQueries): | |
print(f'Spinning up thread with loop index {i}...') | |
thread = SingleThread( | |
target=lambda: sgClient.getShots(projectId=ShotgunConfig.demoProjectId), | |
callbacks=_threadCallback | |
) | |
_threads.append(thread) | |
thread.start() | |
print('Waiting for threads...') | |
for thread in _threads: | |
thread.join() | |
time3 = time.time() | |
print(f'\nPool size (after threading): {sgClient.instancePool.currentSize}') | |
print('Threaded benchmark {:.3f} seconds'.format(time3-time2)) | |
print('"Manual" call') | |
with InstancePoolManager(sgClient.instancePool) as sg: | |
shots = sg.find( | |
entity_type = ShotgunConfig.Entity.shot, | |
filters = sgClient.generateDefaultProjectFilter(ShotgunConfig.demoProjectId), | |
fields = ShotgunConfig.Fields.shot | |
) | |
print(f'Found {len(shots)} shots') | |
print(f'\nPool size (after manual): {sgClient.instancePool.currentSize}') | |
print('\nAll done') | |
def _threadCallback(target, result): | |
result = result or [] | |
print(f'Threaded method {target} on thread {threading.get_ident()} found {len(result)} shots') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment