-
-
Save yuuri/544fc947a84bb6245e03d482a91ad59a to your computer and use it in GitHub Desktop.
多进程使用队列进行分类(将请求分为三类 分别请求不同的服务器)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool | |
from multiprocessing import Queue | |
import random | |
import time | |
from collections import Counter | |
import logging | |
from concurrent_log_handler import ConcurrentRotatingFileHandler | |
from multiprocessing import Manager | |
import argparse | |
log_file = 'q_process.log' | |
class LogAdapter(object): | |
def __init__(self): | |
pass | |
def set_log(self, log_name): | |
logger = logging.getLogger("{}".format(log_name)) | |
logger.setLevel(level=logging.DEBUG) | |
log_handler = ConcurrentRotatingFileHandler('{}'.format(log_file), encoding='utf-8') | |
log_handler.setLevel(logging.DEBUG) | |
log_format = logging.Formatter('%(asctime)s.%(msecs)03d %(name)s %(process)d %(levelname)s %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S') | |
log_handler.setFormatter(log_format) | |
logger.addHandler(log_handler) | |
return logger | |
LOG = LogAdapter().set_log('main') | |
def options(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-d', '--database', help='set your database connection config name') | |
parser.add_argument('-a', '--api', help='set your api server name') | |
parser.add_argument('-b', '--bulk', help='set whether bulk run (option parameter)', action='store_true') | |
parser.add_argument('-t', '--test', help='set whether run test case (option parameter)', action='store_true') | |
parser.add_argument('-s', '--specify', help='set whether run bulk run specify date (option parameter)') | |
parser.add_argument('-i', '--iplist', help='set multi server ip, use ";" to separate') | |
args = parser.parse_args() | |
return args, parser | |
option_name, parser = options() | |
IP_LIST = option_name.iplist | |
WORK_NUM = 18 | |
URL_KEY = 'mayun.cn' | |
process_list = [] | |
url_list = [] | |
def set_queue(IP_LIST): | |
if IP_LIST: | |
IP_LIST = IP_LIST.split(';') | |
ip_number = len(IP_LIST) | |
process_number = WORK_NUM * ip_number | |
Q = Manager().Queue(process_number) | |
for _ in range(WORK_NUM): | |
for i in IP_LIST: | |
Q.put({URL_KEY: i}) | |
else: | |
process_number = WORK_NUM | |
Q = Manager().Queue(process_number) | |
for _ in range(WORK_NUM): | |
Q.put({URL_KEY: URL_KEY}) | |
return Q, process_number | |
project = [] | |
def log(process_num): | |
LOG.warning('Set Work number is {} ip_list is {} process count will be {}'.format(WORK_NUM, IP_LIST, process_num)) | |
def put_queue(q, url, lock): | |
lock.acquire() | |
q.put(url) | |
lock.release() | |
def sub_process(i, total, q, lock): | |
url = q.get() | |
size = q.qsize() | |
url_key = url.keys()[0] | |
url_value = url.values()[0] | |
origin_url = i['PROJECT_URL'] | |
new_url = origin_url.replace(url_key, url_value) | |
i['PROJECT_URL'] = new_url | |
number = random.random() | |
time.sleep(number) | |
put_queue(q, url, lock) | |
print('index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number, | |
new_url, size)) | |
LOG.info( | |
'index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number, | |
new_url, size)) | |
return i, total, url | |
def sub_process2(i, total, q): | |
url = q.get() | |
size = q.qsize() | |
url_key = url.keys()[0] | |
url_value = url.values()[0] | |
origin_url = i['PROJECT_URL'] | |
new_url = origin_url.replace(url_key, url_value) | |
i['PROJECT_URL'] = new_url | |
number = random.randint(5, 10) | |
time.sleep(number) | |
q.put(url) | |
print('index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number, | |
new_url, size)) | |
LOG.info( | |
'index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number, | |
new_url, size)) | |
return i, total, url | |
def record_log(args): | |
index = args[0] | |
total = args[1] | |
url = args[2] | |
url_key = url.keys()[0] | |
url_list.append(url[url_key]) | |
process_list.append(index) | |
number = len(process_list) | |
percent = number / float(total) | |
print('now index is {} percent is {:.2%} url is {}'.format(index, percent, url)) | |
def main(): | |
manager = Manager() | |
lock = manager.Lock() | |
q, process_num = set_queue(IP_LIST) | |
log(process_num) | |
pool = Pool(processes=process_num) | |
all_data = [{'PROJECT_URL': 'https://mayun.cn/{}'.format(i)} for i in range(1000)] | |
for i in all_data: | |
pool.apply_async(sub_process, (i, 1000, q, lock), callback=record_log) | |
pool.close() | |
pool.join() | |
result = Counter(url_list) | |
print(result) | |
LOG.info(result) | |
log(process_num) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment