Skip to content

Instantly share code, notes, and snippets.

@Shellbye
Created April 6, 2017 03:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Shellbye/dbd83af1693987d39f588bfc38ad7172 to your computer and use it in GitHub Desktop.
Save Shellbye/dbd83af1693987d39f588bfc38ad7172 to your computer and use it in GitHub Desktop.
process list with multiprocess
# -*- coding:utf-8 -*-
# Created by shellbye on 2017/3/10.
import multiprocessing
def multi_process_list(process_num=None, func=None, list_to_process=None, **kwargs):
"""
处理多进程任务的框架,用于按段处理list数据
:param process_num: 进程数
:param func: 对每段list的处理方法,需要接收三个参数:queue,start_pos,offset
queue为多线程队列;start_pos为list的开始位置;offset即batch_num,表示每个线程处理的个数
:param list_to_process: 待处理多线程
:return: 分别处理过之后的list
"""
assert process_num is not None
assert func is not None
assert list_to_process is not None
# 初始化工作
manager = multiprocessing.Manager()
queue = manager.Queue()
process_list = []
result_list = []
batch_num = len(list_to_process) / process_num + 1
# 创建并启动进程
print "init process"
for i in range(process_num):
start = i * batch_num
tmp_process = multiprocessing.Process(
target=func,
args=(queue, start, batch_num, list_to_process, kwargs))
process_list.append(tmp_process)
tmp_process.start()
# 等待所有进程结束
print "wait for process end"
for i in range(process_num):
process_list[i].join()
# 获取进程结果
print "get results"
while True:
try:
ret_list = queue.get_nowait()
result_list += ret_list
except Exception as e:
print e.message, "break out queue.get"
break
print "returning results"
return result_list
if __name__ == '__main__':
list_to_process1 = [1, 2, 3, 4, 5, 6, 7, 8, 9]
def f(queue, start, offset, the_list, key=None):
ret_tmp = []
t_list = the_list[start:start + offset]
for i in t_list:
ret_tmp.append(i**2)
queue.put(ret_tmp)
ret = multi_process_list(2, f, list_to_process1)
print ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment