Skip to content

Instantly share code, notes, and snippets.

@jgillmanjr
Created May 5, 2021 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgillmanjr/dd907522f4fd2a91ded871e47f8ccfcc to your computer and use it in GitHub Desktop.
Save jgillmanjr/dd907522f4fd2a91ded871e47f8ccfcc to your computer and use it in GitHub Desktop.
A dumb class to hold a library of connections to checkout and checkin for use with threaded applications
class ConnectionLibrary:
"""
A library to hold various connection objects for use by threads
"""
class LibraryConnection(TypedDict):
conn_obj: Any
available: bool
def __init__(self, conn_constructor: Callable, conn_params_dict: dict, connection_count: int):
"""
Construct a pool of connections for use in threaded applications
:param conn_constructor:
:param conn_params_dict:
:param connection_count:
"""
self.conn_library: Dict[int, 'ConnectionLibrary.LibraryConnection'] = {
i: {'conn_obj': conn_constructor(**conn_params_dict), 'available': True} for i in range(connection_count)
}
def checkout(self) -> Tuple[int, Any]:
"""
Checkout an SFDC connection
:return:
"""
for i, c in self.conn_library.items():
if c['available']:
c['available'] = False
return i, c['conn_obj']
raise Exception('Lol, no more connections available. You probably screwed up somewhere.')
def checkin(self, connection_index) -> None:
"""
Check a connection back in
:param connection_index:
:return:
"""
self.conn_library[connection_index]['available'] = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment