Skip to content

Instantly share code, notes, and snippets.

@yangfch3
Last active September 27, 2021 07:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yangfch3/7b3d8ef079a6a423a535224d075a2f74 to your computer and use it in GitHub Desktop.
Save yangfch3/7b3d8ef079a6a423a535224d075a2f74 to your computer and use it in GitHub Desktop.
python 系统任务多线程案例(以做过的某个卡牌游戏多进程跑牌为例)
import json
import os
import sys
import time
import base64
from subprocess import DEVNULL, STDOUT, Popen, PIPE
import merge_data
conf_file = "run.conf.json"
proc_list = []
def read_conf():
self_path = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(self_path, conf_file), 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def run():
self_path = os.path.dirname(os.path.realpath(__file__))
os.chdir(self_path)
work_dir = os.path.join(self_path, "WorkDir")
conf_obj = read_conf()
assert conf_obj["to_level"] >= conf_obj["from_level"], "assert: 起止关卡不合法"
assert conf_obj["to_level"] - conf_obj["from_level"] < 1000, "assert: 起止关卡不能跨章节"
assert int(conf_obj["to_level"] / 1000) == int(conf_obj["from_level"] / 1000), "assert: 起止关卡必须为同一章节"
assert conf_obj["p_num"] <= 10, "assert: 超出并行进程数限制"
assert conf_obj["repeat_num"] > 1, "assert: 重复次数不合法"
out_dir = "%s_%s_%s_%s_%s" % (conf_obj["from_level"], conf_obj["to_level"],
conf_obj["repeat_num"], "_".join(conf_obj["tags"]), time.strftime("%Y%m%d_%H%M%S"))
cur_level = conf_obj["from_level"]
tags_str = ",".join(conf_obj["tags"])
my_env = {**os.environ,
'PATH': '/usr/sbin:/sbin:/usr/local/bin:' + os.environ['PATH']}
###### 核心逻辑开始:策划给定一系列关卡,每一关开一个系统进程去跑,有最大进程限制,达到最大进程数时需等某一个空闲
start = time.perf_counter()
while cur_level <= conf_obj["to_level"] or len(proc_list) > 0:
if cur_level > conf_obj["to_level"]:
# 等待线程列表全部完成
for proc in proc_list:
ret_code = proc.poll()
if ret_code is not None:
proc_list.remove(proc)
print("cur proc list len: ", len(proc_list))
break
else:
time.sleep(.05)
else:
if len(proc_list) < conf_obj["p_num"]:
# 还能开新的线程
print(cur_level)
# 游戏跑牌逻辑是 lua 实现的,python 调用 lua 系统命令
# 这里 stdout stderr 为 PIPE 在 Windows 下不支持
proc = Popen(['lua53', 'RunCard/PhantomRun.lua',
str(conf_obj["chapter"]),
str(cur_level),
str(conf_obj["use_robot"]),
str(conf_obj["repeat_num"]),
str(int(conf_obj["use_combo_reward"])),
str(int(conf_obj["use_star_full_mode"])),
out_dir,
tags_str
], stdout=sys.stdout, stderr=sys.stderr, env=my_env, cwd=work_dir)
proc_list.append(proc)
# try:
# outs, errs = proc.communicate(timeout=15)
# except Exception:
# proc.kill()
# outs, errs = proc.communicate()
# print(outs, errs)
cur_level += 1
time.sleep(.05)
else:
# 等待线程列表有空位可用
for proc in proc_list:
ret_code = proc.poll()
if ret_code is not None:
proc_list.remove(proc)
print("cur proc list len: ", len(proc_list))
break
else:
time.sleep(.05)
end = time.perf_counter()
print("done! used time = ", end - start)
###### 核心逻辑结束
merge_data.merge(out_dir)
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment