Skip to content

Instantly share code, notes, and snippets.

@peketamin
Created May 10, 2019 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peketamin/d6c8683779d44f4cf107cd231a8dc876 to your computer and use it in GitHub Desktop.
Save peketamin/d6c8683779d44f4cf107cd231a8dc876 to your computer and use it in GitHub Desktop.
Thread and BoundedSemaphore
import time
from threading import Thread, BoundedSemaphore
MAX_CONCURRENT_THREADS = 3
thread_limiter = BoundedSemaphore(value=MAX_CONCURRENT_THREADS)
class MyThread(Thread):
def run(self):
thread_limiter.acquire()
try:
time.sleep(3)
print(f'hello: {time.time()}')
finally:
thread_limiter.release()
if __name__ == '__main__':
for _ in range(10):
MyThread().start()
@peketamin
Copy link
Author

hello: 1557495011.279253
hello: 1557495011.279388
hello: 1557495011.2799518
hello: 1557495014.285185
hello: 1557495014.285435
hello: 1557495014.285822
hello: 1557495017.285931
hello: 1557495017.2861922
hello: 1557495017.286457
hello: 1557495020.2913122

@peketamin
Copy link
Author

import time
from threading import Thread, BoundedSemaphore



class MyThread(Thread):

    MAX_CONCURRENT_THREADS = 3
    thread_limiter = BoundedSemaphore(value=MAX_CONCURRENT_THREADS)

    def run(self):
        self.thread_limiter.acquire()
        try:
            time.sleep(3)
            print(f'hello: {time.time()}')
        finally:
            self.thread_limiter.release()


if __name__ == '__main__':
    for _ in range(10):
        MyThread().start()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment