Skip to content

Instantly share code, notes, and snippets.

@tchar
Last active August 6, 2021 01:08
Show Gist options
  • Save tchar/8ae29db0ce500ca527caaf9dc12ba6de to your computer and use it in GitHub Desktop.
Save tchar/8ae29db0ce500ca527caaf9dc12ba6de to your computer and use it in GitHub Desktop.
Singleton service sharing resources with a runner thread that supports stopping, waking up
from typing import List
from threading import Thread, RLock, Event
import time
import uuid
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ServiceThread(Thread):
def __init__(self, lock: RLock, sleep_for: float, data: List):
super().__init__()
self._lock = lock
self._sleep_for = sleep_for
self._shared_data = data
self._run_count = 0
self._woke_event = Event()
self._stopped_event = Event()
self._id = uuid.uuid4()
self._logger = logger.getChild('Thread (id={})'.format(self.id))
@property
def id(self):
return str(self._id).split('-')[-1]
def _run(self) -> None:
# Write run code
with self._lock:
self._shared_data.append(self._run_count)
self._run_count += 1
def run(self) -> None:
self._logger.info('Starting thread')
while not self._stopped_event.is_set():
self._run()
self._logger.info('Sleeping for {:.0f}s'.format(self._sleep_for))
self._woke_event.wait(self._sleep_for)
self._logger.info('Woke up')
self._woke_event.clear()
self._logger.info('Stopping thread')
def wake(self) -> None:
self._woke_event.set()
def stop(self) -> None:
self._stopped_event.set()
self.wake()
class Service:
_instance = None
def __init__(self):
self.run_interval = 1
self._lock = RLock()
self._thread = None
self._shared_data = ['some', 'data']
@classmethod
def get_instance(cls) -> 'Service':
if cls._instance is None:
cls._instance = Service()
return cls._instance
@property
def is_running(self) -> bool:
return self._thread is not None
@property
def data(self):
with self._lock:
return self._shared_data.copy()
def add_data(self, value):
with self._lock:
self._shared_data.append(value)
def run(self) -> None:
if not self.is_running:
self.start()
return
self._thread.wake()
def start(self) -> None:
if self.is_running:
return
lock, interval, data = self._lock, self.run_interval, self._shared_data
self._thread = ServiceThread(lock, interval, data)
self._thread.start()
def stop(self) -> None:
if not self.is_running:
return
self._thread.stop()
self._thread.join()
self._thread = None
def main():
try:
service = Service.get_instance()
service.start()
for i in range(10):
service.add_data(-i)
time.sleep(1)
print('Data={}'.format(service.data))
except Exception as e:
logger.exception(e)
finally:
service.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment