Skip to content

Instantly share code, notes, and snippets.

@jkanclerz
Forked from nad2000/fib.py
Created March 31, 2018 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkanclerz/ec134b3540a2678ea10427a83da9443a to your computer and use it in GitHub Desktop.
Save jkanclerz/ec134b3540a2678ea10427a83da9443a to your computer and use it in GitHub Desktop.
Threading with Python
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
import time
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# close the pool and wait for the work to finish
pool.close()
pool.join()
#%%
class Timer():
@classmethod
def start(cls):
cls._start = time.time()
@classmethod
def elapsed(cls):
return time.time() - cls._start
@classmethod
def show(cls):
print "*** Elapsed: %0.5f" % cls.elapsed()
#%% --- Single thread ----- #
Timer.start()
results = []
for url in urls:
result = urllib2.urlopen(url)
results.append(result)
Timer.show()
#%% ------- 4 Pool ------- #
Timer.start()
pool = ThreadPool(4)
results = pool.map(urllib2.urlopen, urls)
Timer.show()
#%% ------- 8 Pool ------- #
Timer.start()
pool = ThreadPool(8)
results = pool.map(urllib2.urlopen, urls)
Timer.show()
#%% ------- 13 Pool ------- #
Timer.start()
pool = ThreadPool(13)
results = pool.map(urllib2.urlopen, urls)
Timer.show()
# Single thread: 9.35082 sec
# 4 Pool: 3.09248 sec
# 8 Pool: 2.08774 sec
# 13 Pool: 1.67926 sec
# perf1.py
# Time of a long running request
from socket import *
import time
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25000))
while True:
start = time.time()
sock.send(b'30')
resp =sock.recv(100)
end = time.time()
print(end-start)
# perf2.py
# requests/sec of fast requests
from socket import *
import time
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25000))
n = 0
from threading import Thread
def monitor():
global n
while True:
time.sleep(1)
print(n, 'reqs/sec')
n = 0
Thread(target=monitor).start()
while True:
sock.send(b'1')
resp =sock.recv(100)
n += 1
"""
A more realistic thread pool example
Practical threaded programming with Python
http://www.ibm.com/developerworks/aix/library/au-threadingpython/
"""
import time
import threading
import Queue
import urllib2
#%%
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
#%%
if __name__ == '__main__':
Producer()
# server.py
# Fib microservice
from socket import *
from fib import fib
from threading import Thread
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("Connection", addr)
fib_handler(client)
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")
fib_server(('',25000))
# server.py
# Fib microservice
from socket import *
from fib import fib
from threading import Thread
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("Connection", addr)
Thread(target=fib_handler, args=(client,), daemon=True).start()
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")
fib_server(('',25000))
# server.py
# Fib microservice
from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool
pool = Pool(4)
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("Connection", addr)
Thread(target=fib_handler, args=(client,), daemon=True).start()
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
result = future.result()
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")
fib_server(('',25000))
"""
Standard Producer/Consumer Threading Pattern
"""
import time
import threading
import Queue
#%%
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit':
# if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg
# Always be friendly!
print 'Bye byes!'
def Producer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue()
# Create an instance of the worker
worker = Consumer(queue)
# start calls the internal run() method to
# kick off the thread
worker.start()
# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Produce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time())
# Sleep a bit just to avoid an absurd number of messages
time.sleep(1)
# This the "poison pill" method of killing a thread.
queue.put('quit')
# wait for the thread to close down
worker.join()
#%%
if __name__ == '__main__':
Producer()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment