Last active
February 16, 2020 02:29
-
-
Save guerbai/3450008bc2c3e8df6587f8fddebb1483 to your computer and use it in GitHub Desktop.
并发洗数
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from queue import Queue | |
q = Queue() | |
NUM_WORKER_THREADS = 150 | |
def parallel_work(q, produce, work, num_worker_threads): | |
producer = threading.Thread(target=produce) | |
producer.daemon = True | |
producer.start() | |
producer.join() | |
for i in range(num_worker_threads): | |
t = threading.Thread(target=work) | |
t.daemon = True | |
t.start() | |
q.join() | |
def produce(): | |
for id, name in INFOS.items(): | |
q.put({'id': id, 'name': name, 'retry_time': 1}) | |
def worker(): | |
while not q.empty(): | |
item = q.get() | |
try: | |
# call logic method do some thing. | |
log('%s, %s call logic method success' % (str(item['id']), item['name'])) | |
except Exception as e: | |
q.task_done() | |
if item['retry_time'] <= 3: | |
q.put({'id': item['id'], 'name': item['name'], 'retry_time': item['retry_time'] + 1}) | |
continue | |
q.task_done() | |
log('success', 'remain size: %d' % q.qsize()) | |
parallel_work(q, produce, worker, NUM_WORKER_THREADS) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import time | |
from multiprocessing import Pool, Queue, Process | |
q = Queue() | |
def produce(): | |
count = 0 | |
while count < 3500000: | |
for _ in range(1000): | |
q.put(count) | |
count += 1 | |
time.sleep(0.5) | |
def consume(i): | |
while True: | |
item = q.get(True, 10) | |
print 'process %s deal : ' %i, item | |
time.sleep(0.001) | |
if __name__ == '__main__': | |
number_of_process = 10 | |
workers = [] | |
for i in range(number_of_process): | |
workers.append(Process(target=consume, args=(i,))) | |
produce_p = Process(target=produce) | |
produce_p.start() | |
map(lambda x: x.start(), workers) | |
map(lambda x: x.join(), workers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from functools import wraps | |
from timeit import default_timer | |
import gevent | |
from gevent.queue import Queue | |
def gevent_throttle(calls_per_sec=0): | |
"""Decorates a Greenlet function for throttling.""" | |
interval = 1. / calls_per_sec if calls_per_sec else 0 | |
def decorate(func): | |
blocked = [False] # has to be a list to not get localised inside the while loop | |
# otherwise, UnboundLocalError: local variable 'blocked' referenced before assignment | |
last_time = [0] # ditto | |
@wraps(func) # propagates docstring | |
def throttled_func(*args, **kwargs): | |
while True: | |
# give other greenlets a chance to run, otherwise we | |
# might get stuck while working thread is sleeping and the block is ON | |
gevent.sleep(0) | |
if not blocked[0]: | |
blocked[0] = True | |
# check if actually might need to pause | |
if calls_per_sec: | |
last, current = last_time[0], default_timer() | |
elapsed = current - last | |
if elapsed < interval: | |
gevent.sleep(interval - elapsed) | |
last_time[0] = default_timer() | |
blocked[0] = False | |
return func(*args, **kwargs) | |
return throttled_func | |
return decorate | |
@gevent_throttle(100) | |
def controled_sleep(): | |
return | |
if __name__ == "__main__": | |
i = 0 | |
while True: | |
controled_sleep() | |
i += 1 | |
if i % 100 == 0: | |
print (i) | |
if i > 10000: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment