Skip to content

Instantly share code, notes, and snippets.

@yuuri
Created December 10, 2020 10:34
Show Gist options
  • Save yuuri/544fc947a84bb6245e03d482a91ad59a to your computer and use it in GitHub Desktop.
Save yuuri/544fc947a84bb6245e03d482a91ad59a to your computer and use it in GitHub Desktop.
多进程使用队列进行分类(将请求分为三类 分别请求不同的服务器)
from multiprocessing import Pool
from multiprocessing import Queue
import random
import time
from collections import Counter
import logging
from concurrent_log_handler import ConcurrentRotatingFileHandler
from multiprocessing import Manager
import argparse
log_file = 'q_process.log'
class LogAdapter(object):
def __init__(self):
pass
def set_log(self, log_name):
logger = logging.getLogger("{}".format(log_name))
logger.setLevel(level=logging.DEBUG)
log_handler = ConcurrentRotatingFileHandler('{}'.format(log_file), encoding='utf-8')
log_handler.setLevel(logging.DEBUG)
log_format = logging.Formatter('%(asctime)s.%(msecs)03d %(name)s %(process)d %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
return logger
LOG = LogAdapter().set_log('main')
def options():
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--database', help='set your database connection config name')
parser.add_argument('-a', '--api', help='set your api server name')
parser.add_argument('-b', '--bulk', help='set whether bulk run (option parameter)', action='store_true')
parser.add_argument('-t', '--test', help='set whether run test case (option parameter)', action='store_true')
parser.add_argument('-s', '--specify', help='set whether run bulk run specify date (option parameter)')
parser.add_argument('-i', '--iplist', help='set multi server ip, use ";" to separate')
args = parser.parse_args()
return args, parser
option_name, parser = options()
IP_LIST = option_name.iplist
WORK_NUM = 18
URL_KEY = 'mayun.cn'
process_list = []
url_list = []
def set_queue(IP_LIST):
if IP_LIST:
IP_LIST = IP_LIST.split(';')
ip_number = len(IP_LIST)
process_number = WORK_NUM * ip_number
Q = Manager().Queue(process_number)
for _ in range(WORK_NUM):
for i in IP_LIST:
Q.put({URL_KEY: i})
else:
process_number = WORK_NUM
Q = Manager().Queue(process_number)
for _ in range(WORK_NUM):
Q.put({URL_KEY: URL_KEY})
return Q, process_number
project = []
def log(process_num):
LOG.warning('Set Work number is {} ip_list is {} process count will be {}'.format(WORK_NUM, IP_LIST, process_num))
def put_queue(q, url, lock):
lock.acquire()
q.put(url)
lock.release()
def sub_process(i, total, q, lock):
url = q.get()
size = q.qsize()
url_key = url.keys()[0]
url_value = url.values()[0]
origin_url = i['PROJECT_URL']
new_url = origin_url.replace(url_key, url_value)
i['PROJECT_URL'] = new_url
number = random.random()
time.sleep(number)
put_queue(q, url, lock)
print('index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number,
new_url, size))
LOG.info(
'index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number,
new_url, size))
return i, total, url
def sub_process2(i, total, q):
url = q.get()
size = q.qsize()
url_key = url.keys()[0]
url_value = url.values()[0]
origin_url = i['PROJECT_URL']
new_url = origin_url.replace(url_key, url_value)
i['PROJECT_URL'] = new_url
number = random.randint(5, 10)
time.sleep(number)
q.put(url)
print('index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number,
new_url, size))
LOG.info(
'index is {} url key is {} value is {} sleep {} new_url is {} size {}'.format(i, url_key, url_value, number,
new_url, size))
return i, total, url
def record_log(args):
index = args[0]
total = args[1]
url = args[2]
url_key = url.keys()[0]
url_list.append(url[url_key])
process_list.append(index)
number = len(process_list)
percent = number / float(total)
print('now index is {} percent is {:.2%} url is {}'.format(index, percent, url))
def main():
manager = Manager()
lock = manager.Lock()
q, process_num = set_queue(IP_LIST)
log(process_num)
pool = Pool(processes=process_num)
all_data = [{'PROJECT_URL': 'https://mayun.cn/{}'.format(i)} for i in range(1000)]
for i in all_data:
pool.apply_async(sub_process, (i, 1000, q, lock), callback=record_log)
pool.close()
pool.join()
result = Counter(url_list)
print(result)
LOG.info(result)
log(process_num)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment