Skip to content

Instantly share code, notes, and snippets.

@armohamm
Forked from danielskovli/shotgunThreadSafeTest.py
Created February 7, 2022 15:37
Show Gist options
  • Save armohamm/5d6daf0b2db1f3727412129ab7e309f4 to your computer and use it in GitHub Desktop.
Save armohamm/5d6daf0b2db1f3727412129ab7e309f4 to your computer and use it in GitHub Desktop.
Simple demo of a thread-safe implementation of Shotgun Python API via pool storage and a pool (context) manager
# -*- coding: utf-8 -*-
'''
Shotgun thread-safe API call demo for Python 3.x
Requires Shotgun Python API: https://github.com/shotgunsoftware/python-api
Context manager implementation based on concept from ArjanCodes: https://www.youtube.com/watch?v=Rm4JP7JfsKY
All code provided as a single file for ease of testing/demo. Should most definitely be split into
relevant modules if implemented in production.
`ShotgunConfig` must be completed as applicable for your given Shotgun setup.
'''
from __future__ import annotations
from typing import Any, Callable
import time
import threading
import shotgun_api3
class ShotgunConfig:
'''ShotGrid credentials and config'''
host = '<Shotgun site url>'
clientId = '<script API name>'
clientSecret = '<script API secret>'
demoProjectId = 0
class Entity:
'''Entity types'''
project = 'Project'
shot = 'Shot'
class Fields:
'''Default field requests'''
_default = ['id', 'code', 'sg_status', 'sg_status_list', 'description']
shot = _default + ['sg_camera_lens', 'sg_camera_framing', 'sg_camera_description', 'sg_sequence']
class NoAvailableInstancesError(Exception):
'''Pool manager does not have any available instances to provide'''
class InstancePoolManager:
'''Manager for `InstancePool`'''
def __init__(self, pool: InstancePool):
'''Initialize a new InstancePoolManager. This object will handle enter/exit hooks during a `with` clause
Args:
pool (InstancePool): The InstancePool to interact with
'''
self.pool = pool
self.obj = None
def __enter__(self):
'''User-code has entered `with` clause, acquire Shotgun instance'''
self.instance = self.pool.acquire()
print(f'Manager: Allocated Shotgun instance with ID {id(self.instance)} (session token {self.instance.config.session_token})')
return self.instance
def __exit__(self, *args):
'''User-code has exited `with` clause, release Shotgun instance'''
self.pool.release(self.instance)
class InstancePool:
'''Instance pool that keeps track of `Shotgun` instances'''
def __init__(self, host: str, scriptName: str, apiKey: str, size: int=-1):
'''Initialize a new InstancePool
Args:
host (str): Base URL to Shotgun site. Eg. https://your-site.shotgunstudio.com
scriptName (str): API key name
apiKey (str): API key secret
size (int, optional): Max pool size. Defaults to -1, which means unlimited
'''
self.host = host
self.scriptName = scriptName
self.apiKey = apiKey
self.size = size
self.free: list[shotgun_api3.Shotgun] = []
self.inUse: list[shotgun_api3.Shotgun] = []
@property
def currentSize(self) -> int:
return len(self.free) + len(self.inUse)
def acquire(self) -> shotgun_api3.Shotgun:
'''Acquire an instance from the pool. Recycle if possible, create new if required (within `self.size` limits)'''
numFree = len(self.free)
numUsed = len(self.inUse)
if self.size > -1 and numFree == 0 and numUsed >= self.size:
raise NoAvailableInstancesError(f'No further instances can be allocated, as defined by user-defined maximum pool size: {self.size}')
instance: shotgun_api3.Shotgun
if numFree:
print('Acquire: Returning existing free instance')
instance = self.free.pop(0)
else:
print('Acquire: Genereting new instance')
instance = self.instanceFactory()
self.inUse.append(instance)
return instance
def release(self, r: shotgun_api3.Shotgun):
'''Release an instance -> move it from `inUse` to `free`'''
self.inUse.remove(r)
self.free.append(r)
def instanceFactory(self) -> shotgun_api3.Shotgun:
'''Generate a new, or clone existing shotgun connection as applicable'''
existingInstance: shotgun_api3.Shotgun|None = None
# Realistically this never happens if called from `self.acquire`
if self.free and self.free[0].config.session_token:
existingInstance = self.free[0]
# This is more likely to happen, since the reason we're generating an instance is because all existing ones are busy
elif self.inUse and self.inUse[0].config.session_token:
existingInstance = self.inUse[0]
# We have an instance, clone it
if existingInstance:
print(f'Factory: Using existing instance session token: {existingInstance.config.session_token}')
instance = shotgun_api3.Shotgun(
base_url = self.host,
connect = False,
session_token = existingInstance.config.session_token
# session_token = existingInstance.get_session_token()
)
instance._connection = None
return instance
# Need to generate new instance, which will require authentication
else:
print('Factory: Generating new instance with auth creds')
instance = shotgun_api3.Shotgun(
base_url = self.host,
script_name = self.scriptName,
api_key = self.apiKey
)
instance.config.session_token = instance.get_session_token() # Force auth, store session token
return instance
class SingleThread(threading.Thread):
def __init__(self, target: Callable, callbacks: Callable|list[Callable]=None) -> None:
'''Generic thread for API calls, taking a target method and (optional) callback method reference.
To start the execution of this thread, `.start()` must explicitly be called. See `threading.Thread` docs for further details.
Args:
target (Callable): Target method to execute
callbacks (Callable|list[Callable], optional): Callback method(s) to execute after thread is finished. Must accept `targetReference, *args, **kwargs` payload -> the latter two being the result from `target()`
'''
self.target = target
self.callbacks = callbacks
self._die: bool = False
super().__init__(target=self.wrapper)
def kill(self):
'''This does not exactly kill the thread, but will prevent them from executing callbacks'''
self._die = True
def wrapper(self) -> None:
result = self.target()
if self._die:
return
if self.callbacks:
if not hasattr(self.callbacks, '__iter__'):
self.callbacks = [self.callbacks]
callback: Callable
for callback in self.callbacks: # type: ignore
if self._die:
return
callback(self.target, result)
class ShotgunClient:
'''Shotgun API Wrapper'''
def __init__(self, poolSize: int=-1) -> None:
'''Shotgun API wrapper.
Most methods will block while waiting for http, so best called on a separate thread.
To access the `shotgun_api3.Shotgun` instance directly at any stage, use the `InstancePoolManager` or in a pinch, the `.instance` getter
'''
super().__init__()
self.instancePool = InstancePool(
host=ShotgunConfig.host,
scriptName=ShotgunConfig.clientId,
apiKey=ShotgunConfig.clientSecret,
size=poolSize
)
@property
def instance(self) -> shotgun_api3.Shotgun:
'''Acquires a `Shotgun` instance from the instance pool directly.
This will work, and will be tracked, but will never be recycled unless done so manually by the caller
Eg. herein lies memory leaks...
A better way to access the Shotgun instance is to call the pool manager via `with InstancePoolManager(self.instancePool) as sg: ...`
'''
# This will be tracked in the pool, but unless the caller manually releases it,
# the instance will never be returned and recycled
return self.instancePool.acquire()
@classmethod
def generateEntityObject(cls, entityType: str, shotgunId: int) -> dict[str, Any]:
'''Helper: Generate and return a dict containing the correct query parameter for an entity type + id'''
return {
'type': entityType,
'id': shotgunId
}
@classmethod
def generateDefaultProjectFilter(cls, projectId: int|None) -> list[Any]|list[list[Any]]:
'''Helper: Generate and return a default filter for the given project id'''
if projectId is not None:
return [
[ShotgunConfig.Entity.project.lower(), 'is', cls.generateEntityObject(ShotgunConfig.Entity.project, projectId)],
]
else:
return []
def getShots(self, projectId=None, filters=None, fields=None) -> list[dict[str, Any]]:
'''Get all shots from a given project with the given filter/fields, or defaults'''
with InstancePoolManager(self.instancePool) as sg:
print(f'ShotgunClient: InstanceManager returned Shotgun instance with ID {id(sg)} (session token {sg.config.session_token})')
return sg.find(
entity_type = ShotgunConfig.Entity.shot,
filters = filters or self.generateDefaultProjectFilter(projectId),
fields = fields or ShotgunConfig.Fields.shot
)
def main():
print('Initiating test')
sgClient = ShotgunClient()
# sgClient = ShotgunClient(poolSize=5) # Uncomment to trigger `NoAvailableInstancesError`
numQueries = 10
time1 = time.time()
print(f'Pool size (initial): {sgClient.instancePool.currentSize}')
print(f'Executing {numQueries} sequental API calls:')
for i in range(numQueries):
print(f'Getting shots with loop index {i}...')
shots = sgClient.getShots(projectId=650)
print(f'Found {len(shots)} shots')
time2 = time.time()
print('Sequential benchmark: {:.3f} seconds'.format(time2-time1))
print(f'Pool size (after sequential loop): {sgClient.instancePool.currentSize}')
print(f'Executing {numQueries} threaded API calls:')
_threads: list[SingleThread] = []
for i in range(numQueries):
print(f'Spinning up thread with loop index {i}...')
thread = SingleThread(
target=lambda: sgClient.getShots(projectId=ShotgunConfig.demoProjectId),
callbacks=_threadCallback
)
_threads.append(thread)
thread.start()
print('Waiting for threads...')
for thread in _threads:
thread.join()
time3 = time.time()
print(f'\nPool size (after threading): {sgClient.instancePool.currentSize}')
print('Threaded benchmark {:.3f} seconds'.format(time3-time2))
print('"Manual" call')
with InstancePoolManager(sgClient.instancePool) as sg:
shots = sg.find(
entity_type = ShotgunConfig.Entity.shot,
filters = sgClient.generateDefaultProjectFilter(ShotgunConfig.demoProjectId),
fields = ShotgunConfig.Fields.shot
)
print(f'Found {len(shots)} shots')
print(f'\nPool size (after manual): {sgClient.instancePool.currentSize}')
print('\nAll done')
def _threadCallback(target, result):
result = result or []
print(f'Threaded method {target} on thread {threading.get_ident()} found {len(result)} shots')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment