Skip to content

Instantly share code, notes, and snippets.

@guerbai
Last active February 16, 2020 02:29
Show Gist options
  • Save guerbai/3450008bc2c3e8df6587f8fddebb1483 to your computer and use it in GitHub Desktop.
Save guerbai/3450008bc2c3e8df6587f8fddebb1483 to your computer and use it in GitHub Desktop.
并发洗数
import threading
from queue import Queue
q = Queue()
NUM_WORKER_THREADS = 150
def parallel_work(q, produce, work, num_worker_threads):
producer = threading.Thread(target=produce)
producer.daemon = True
producer.start()
producer.join()
for i in range(num_worker_threads):
t = threading.Thread(target=work)
t.daemon = True
t.start()
q.join()
def produce():
for id, name in INFOS.items():
q.put({'id': id, 'name': name, 'retry_time': 1})
def worker():
while not q.empty():
item = q.get()
try:
# call logic method do some thing.
log('%s, %s call logic method success' % (str(item['id']), item['name']))
except Exception as e:
q.task_done()
if item['retry_time'] <= 3:
q.put({'id': item['id'], 'name': item['name'], 'retry_time': item['retry_time'] + 1})
continue
q.task_done()
log('success', 'remain size: %d' % q.qsize())
parallel_work(q, produce, worker, NUM_WORKER_THREADS)
# -*- coding: utf-8 -*-
import time
from multiprocessing import Pool, Queue, Process
q = Queue()
def produce():
count = 0
while count < 3500000:
for _ in range(1000):
q.put(count)
count += 1
time.sleep(0.5)
def consume(i):
while True:
item = q.get(True, 10)
print 'process %s deal : ' %i, item
time.sleep(0.001)
if __name__ == '__main__':
number_of_process = 10
workers = []
for i in range(number_of_process):
workers.append(Process(target=consume, args=(i,)))
produce_p = Process(target=produce)
produce_p.start()
map(lambda x: x.start(), workers)
map(lambda x: x.join(), workers)
import time
from functools import wraps
from timeit import default_timer
import gevent
from gevent.queue import Queue
def gevent_throttle(calls_per_sec=0):
"""Decorates a Greenlet function for throttling."""
interval = 1. / calls_per_sec if calls_per_sec else 0
def decorate(func):
blocked = [False] # has to be a list to not get localised inside the while loop
# otherwise, UnboundLocalError: local variable 'blocked' referenced before assignment
last_time = [0] # ditto
@wraps(func) # propagates docstring
def throttled_func(*args, **kwargs):
while True:
# give other greenlets a chance to run, otherwise we
# might get stuck while working thread is sleeping and the block is ON
gevent.sleep(0)
if not blocked[0]:
blocked[0] = True
# check if actually might need to pause
if calls_per_sec:
last, current = last_time[0], default_timer()
elapsed = current - last
if elapsed < interval:
gevent.sleep(interval - elapsed)
last_time[0] = default_timer()
blocked[0] = False
return func(*args, **kwargs)
return throttled_func
return decorate
@gevent_throttle(100)
def controled_sleep():
return
if __name__ == "__main__":
i = 0
while True:
controled_sleep()
i += 1
if i % 100 == 0:
print (i)
if i > 10000:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment