Created
May 5, 2021 16:10
-
-
Save jgillmanjr/dd907522f4fd2a91ded871e47f8ccfcc to your computer and use it in GitHub Desktop.
A dumb class to hold a library of connections to checkout and checkin for use with threaded applications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConnectionLibrary: | |
""" | |
A library to hold various connection objects for use by threads | |
""" | |
class LibraryConnection(TypedDict): | |
conn_obj: Any | |
available: bool | |
def __init__(self, conn_constructor: Callable, conn_params_dict: dict, connection_count: int): | |
""" | |
Construct a pool of connections for use in threaded applications | |
:param conn_constructor: | |
:param conn_params_dict: | |
:param connection_count: | |
""" | |
self.conn_library: Dict[int, 'ConnectionLibrary.LibraryConnection'] = { | |
i: {'conn_obj': conn_constructor(**conn_params_dict), 'available': True} for i in range(connection_count) | |
} | |
def checkout(self) -> Tuple[int, Any]: | |
""" | |
Checkout an SFDC connection | |
:return: | |
""" | |
for i, c in self.conn_library.items(): | |
if c['available']: | |
c['available'] = False | |
return i, c['conn_obj'] | |
raise Exception('Lol, no more connections available. You probably screwed up somewhere.') | |
def checkin(self, connection_index) -> None: | |
""" | |
Check a connection back in | |
:param connection_index: | |
:return: | |
""" | |
self.conn_library[connection_index]['available'] = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment