Skip to content

Instantly share code, notes, and snippets.

@JacoCheung
Last active November 24, 2023 06:55
Show Gist options
  • Save JacoCheung/5c06fe62448054d52efd4aa115993f84 to your computer and use it in GitHub Desktop.
Save JacoCheung/5c06fe62448054d52efd4aa115993f84 to your computer and use it in GitHub Desktop.
import hugectr
from mpi4py import MPI
import time
import os
import datetime
import hugectr2onnx
import json
import sys
from tools.log import Log
logger = Log(__name__).getlog()
import argparse
arg_parser = argparse.ArgumentParser(description="模型离线训练")
arg_parser.add_argument("--model_name", type=str, default="din_1k_v2")
arg_parser.add_argument("--features_num", type=int, required=False,default=0)
arg_parser.add_argument("--train_keyset_num", type=int, default=25)
arg_parser.add_argument("--keyset_dir", type=str, required=True)
arg_parser.add_argument('--batch_size', type=int, default=36000)
arg_parser.add_argument('--batchsize_eval', type=int, default=36000)
arg_parser.add_argument('--max_eval_batches', type=int, default=5000)
arg_parser.add_argument('--lr', type=float, default=0.0001)
arg_parser.add_argument('--gpus', type=str, default='0')
arg_parser.add_argument('--num_workers', type=int, default=30)
arg_parser.add_argument('--slice', type=int, default=10)
arg_parser.add_argument('--label_name', type=str, default='label')
arg_parser.add_argument('--sparse_embedding_name', type=str, default='sparse_embedding1')
arg_parser.add_argument("--train_dir", type=str, required=True, default='/data')
arg_parser.add_argument('--json_dir', type=str, default='/json')
arg_parser.add_argument('--embedding_vec_size', type=int, default=25)
arg_parser.add_argument('--workspace_size_per_gpu_in_mb', type=int, default=10000) # 40000
arg_parser.add_argument('--workspace_size_per_gpu_in_mb_null', type=int, default=5000) # 8700
arg_parser.add_argument('--start_date', type=str, required=True)
arg_parser.add_argument('--end_date', type=str, required=True)
arg_parser.add_argument('--datePath', type=str, required=True)
args = arg_parser.parse_args()
# ------------------------------------------------------------------------------------------------------------
program_start = datetime.datetime.now() # 程序启动的时间
# 参数预处理
train_date = args.datePath
model_name = args.model_name
args.train_dir = args.train_dir + "/" + model_name + "/offline"
gpus = list(map(int, args.gpus.split(',')))
model_list = [model_name.replace(".py", "") for model_name in os.listdir("./model_dir") if (".py" in model_name) and ("log.py" not in model_name)]
print("======================================================异常检测=======================================================")
print("======================================================基础参数=======================================================")
logger.info("Program start time:{}".format(program_start))
logger.info("保存模型日期: {}".format(train_date))
logger.info("模型名称: {}".format(model_name))
logger.info("特征个数: {}".format(args.features_num))
logger.info("训练数据地址: {}".format(args.train_dir))
logger.info("gpu list: {}".format(gpus))
logger.info("sparse_embedding_name: {}".format(args.sparse_embedding_name))
# ------------------------------------------------------------------------------------------------------------
# 规则:start_date < <= end_date
start_date = args.start_date.replace("-","")
end_date = args.end_date.replace("-","")
date1 = datetime.datetime.strptime(start_date, "%Y%m%d").date()
date2 = datetime.datetime.strptime(end_date, "%Y%m%d").date()
Days = (date2-date1).days
if Days <= 0:
logger.info(f"日期填写错误, start date: {start_date}, end date: {end_date}")
sys.exit()
logger.info(f"训练数据日期范围{start_date} < train_date <= {end_date}")
end_date_reshape = datetime.datetime.strptime(end_date, "%Y%m%d").date()
day_list = [(end_date_reshape - datetime.timedelta(days=i)).strftime("%Y%m%d") for i in range(Days)]
# args.train_keyset_num = len(day_list)
logger.info(f"train_keyset_num: {args.train_keyset_num}")
print("=====================================================数据预处理=======================================================")
# 路径搜索函数
def get_files(dir, suffix):
py_files = []
for root, dirs, files in os.walk(dir): # 遍历所有目录,包括自身
for file in files: # 遍历文件,抓取指定文件
pre, suf = os.path.splitext(file)
if suf == suffix:
py_files.append(os.path.join(root, file))
# 排除空文件
res = []
for file in py_files:
if os.path.getsize(file):
res.append(file)
return res
# 训练数据下载
download_time = time.time()
json_time = time.time()
# 参数配置读取
json_time = time.time()
# offline_json_dir = "/data/txstream/txstreamv2/offlines/txstreamv2.json"
offline_json_dir = "/data/txstream/txstreamv2/txstreamv2.json"
logger.info(f"offline_json_dir: {offline_json_dir}")
with open(offline_json_dir, 'r', encoding='utf-8') as f:
clusterIn = json.load(f)
cat_features_pos, slot_size_array,seq_array,cate_array = [], [],[],[]
tmp,t1,t2=[],[],[]
cou = 0
for i in clusterIn["featureDescList"]:
cat_features_pos.append(cou)
if i["featureOperator"] == "LabelEncoder":
slot_size_array.append(len(i["featureOpArgs"]["classes"].split(",")[:-1])+1)
tmp.append(len(i["featureOpArgs"]["classes"].split(",")[:-1])+1)
cou += 1
elif i["featureOperator"] == "KBinsDiscretizer":
slot_size_array.append(i["featureOpArgs"]["nBins"]+1)
tmp.append(i["featureOpArgs"]["nBins"]+1)
cou += 1
elif i["featureOperator"] == "null":
slot_size_array.append(110000000)
tmp.append(110000000)
cou += 1
elif i["featureOperator"] == "SequenceEncoder":
if len(t1)==0:
t1=tmp
else:
t2=tmp
tmp=[]
if len(seq_array)==0:
for _ in range(int(i['featureOpArgs']['maxLen'])):
seq_array.append(1000000)
slot_size_array.append(1000000)
cou += 1
else:
for _ in range(int(i['featureOpArgs']['maxLen'])):
cate_array.append(1000000)
slot_size_array.append(1000000)
cou += 1
else:
slot_size_array.append(10000)
tmp.append(10000)
cou += 1
if len(t2)==0:
t2=tmp
null_tmp=[]
null_pos_list=[]
for each in t1:
if each!=110000000:
null_tmp.append(each)
else:
null_pos_list.append(null_tmp)
null_pos_list.append([each])
null_tmp=[]
if len(null_tmp)!=0:
null_pos_list.append(null_tmp)
logger.info(f"json 文件处理耗时: {time.time()-json_time}s")
# t1 = [8, 244, 120, 326, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7]
# t2 = [0, 0, 0, 0, 0, 0, 0, 0, 84186]
# t3 = [3, 3, 216, 173, 5, 39, 158, 19, 17, 1691, 7, 3, 7, 7, 110000000, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 110000000, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 110000000, 7, 110000000, 7, 7, 7, 7, 7, 7, 110000000, 7, 7, 7, 7, 7, 7, 7, 7, 110000000, 7, 7, 1893]
# all_solt = t1 + t2 + t3
all_solt = slot_size_array
print("===========================================================================")
print("slot_size_array length:{},t1 length:{},t2 length:{},seq_array length:{},cate_array length:{}".format(len(slot_size_array),len(t1),len(t2),len(seq_array),len(cate_array)))
print("null_pos_list len:{}".format(len(null_pos_list)))
for each in null_pos_list:
print("each list length :{}".format(len(each)))
print("===========================================================================")
cat_pos = cat_features_pos
label_name_init = clusterIn["tableArgs"]["labelList"][0]["labelFlag"]
if args.label_name != label_name_init:
logger.info(f"label_name入参不正确({args.label_name}), 已修正为json文件中的label_name: {label_name_init}")
args.label_name = label_name_init
json_time = time.time() - json_time
logger.info(f"json 文件处理耗时: {json_time}s")
# ------------------------------------------------------------------------------------------------------------
# key_time = time.time()
# 在训练文件地址下新建一个keySet文件夹,把keySet的parquet文件放进去
# keyset文件生成 todo
if not os.path.exists(f"{args.keyset_dir}"):
os.system(f"mkdir {args.keyset_dir}")
args.keyset_dir = args.keyset_dir + "/" + model_name
# os.system("rm -rf %s" % args.keyset_dir)
# os.system("mkdir %s" % args.keyset_dir)
logger.info(f"keyset地址: {args.keyset_dir}")
str_cat_features_pos, str_slot_size_array = "", ""
for i in cat_pos:
str_cat_features_pos += f"{i} "
for i in all_solt:
str_slot_size_array += f"{i} "
# for pos in range(args.train_keyset_num):
# logger.info(f"正在处理第{pos+1}批次的数据, 共{args.train_keyset_num}组数据")
# os.system(f"python generate_keyset.py --file_dir '/root/train_dir/{model_name}' --save_dir {args.keyset_dir} --txt_num {1} --target {args.label_name} --cat_features_pos {str_cat_features_pos} --slot_size_array {str_slot_size_array} --pos {pos}")
# key_time = time.time() - key_time
# logger.info(f"训练数据处理耗时: {key_time}s")
print("======================================================模型训练=======================================================")
train_time = time.time()
# source = [args.keyset_dir+ "/" + str(i) +"/0.txt" for i in range(10,args.train_keyset_num)]
# keyset = ["/root/keyset/din_1k_v1/0.keyset" for _ in range(10,args.train_keyset_num)]
source=["/root/keyset_dir/din_1k_v3/0/0.txt"]
keyset = ["/root/keyet/0.keyset"]
print("#########################################################")
print("args.train_keyset_num",args.train_keyset_num)
print("source",source)
print("keyset",keyset)
print("#########################################################")
# ------------------------------------------------------------------------------------------------------------
#本地模型文件预处理
os.system(f"rm -rf /root/{args.model_name}")
os.system(f"mkdir /root/{args.model_name}")
# ------------------------------------------------------------------------------------------------------------
#模型训练开始
solver = hugectr.CreateSolver(model_name = args.model_name,
max_eval_batches = args.max_eval_batches,
batchsize_eval = args.batchsize_eval,
batchsize = args.batch_size,
lr = args.lr,
vvgpu = [gpus],
i64_input_key = True,
use_mixed_precision = False,
repeat_dataset = False,
use_cuda_graph = True
)
reader = hugectr.DataReaderParams(data_reader_type = hugectr.DataReaderType_t.Parquet,
source = source,
keyset = keyset,
eval_source = args.keyset_dir + "/0/eval.txt",
num_workers=args.num_workers,
slot_size_array=all_solt,
check_type = hugectr.Check_t.Sum)
# optimizer = hugectr.CreateOptimizer(optimizer_type = hugectr.Optimizer_t.Adam)
optimizer = hugectr.CreateOptimizer(
optimizer_type=hugectr.Optimizer_t.Adam,
update_type=hugectr.Update_t.Global,
beta1=0.9,
beta2=0.999,
epsilon=0.000000001,
)
etc = hugectr.CreateETC(ps_types = [hugectr.TrainPSType_t.Staged for _ in range(len(null_pos_list)+3)],
sparse_models = [f"/root/{args.model_name}/sparse_file{i}" for i in range(len(null_pos_list)+3)],
local_paths = ["/root/"])
# etc = hugectr.CreateETC(ps_types = [hugectr.TrainPSType_t.Staged,
# hugectr.TrainPSType_t.Staged,
# hugectr.TrainPSType_t.Staged,
# hugectr.TrainPSType_t.Staged],
# sparse_models = [f"/root/{args.model_name}/sparse_file1",
# f"/root/{args.model_name}/sparse_file2",
# f"/root/{args.model_name}/sparse_file3",
# f"/root/{args.model_name}/sparse_file4"],
# local_paths = ["/root/"])
model = hugectr.Model(solver, reader, optimizer, etc)
model.add(hugectr.Input(label_dim = 1, label_name = args.label_name,
dense_dim = 0, dense_name = "dense",
data_reader_sparse_param_array =
[hugectr.DataReaderSparseParam(f"data{i}", 1, False, len(null_list)) for i,null_list in enumerate(null_pos_list)]+
[hugectr.DataReaderSparseParam("seq", 1, False, len(seq_array)),
hugectr.DataReaderSparseParam("cate", 1, False, len(cate_array)),
hugectr.DataReaderSparseParam("data_other", 1, False, len(t2))]))
for i,null_list in enumerate(null_pos_list):
workspace_size_per_gpu_in_mb_num = args.workspace_size_per_gpu_in_mb
if len(null_list)==1 and null_list[0]==110000000:
workspace_size_per_gpu_in_mb_num = args.workspace_size_per_gpu_in_mb_null
model.add(
hugectr.SparseEmbedding(
embedding_type=hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb=workspace_size_per_gpu_in_mb_num,
embedding_vec_size=args.embedding_vec_size,
combiner="sum",
sparse_embedding_name=f"sparse_embedding_name{i}",
bottom_name=f"data{i}",
optimizer=optimizer,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Concat,
bottom_names=[f"sparse_embedding_name{i}" for i in range(len(null_pos_list))],
top_names=["sparse_embedding_data_combine"],
)
)
model.add(
hugectr.SparseEmbedding(
embedding_type=hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb=args.workspace_size_per_gpu_in_mb,
embedding_vec_size=args.embedding_vec_size,
combiner="sum",
sparse_embedding_name="sparse_embedding_seq",
bottom_name="seq",
optimizer=optimizer,
)
)
model.add(
hugectr.SparseEmbedding(
embedding_type=hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb=args.workspace_size_per_gpu_in_mb,
embedding_vec_size=args.embedding_vec_size,
combiner="sum",
sparse_embedding_name="sparse_embedding_cate",
bottom_name="cate",
optimizer=optimizer,
)
)
model.add(
hugectr.SparseEmbedding(
embedding_type=hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb=args.workspace_size_per_gpu_in_mb,
embedding_vec_size=args.embedding_vec_size,
combiner="sum",
sparse_embedding_name="sparse_embedding_data2",
bottom_name="data_other",
optimizer=optimizer,
)
)
#--------------------------------------------------------------------------------------------
# model 注意力层
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale2"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale2","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale3"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale3","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale4"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale4","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale5"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale5","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale6"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale6","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale7"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale7","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale8"],
# )
# )
# model.add(
# hugectr.DenseLayer( layer_type = hugectr.Layer_t.Concat, bottom_names=["sparse_embedding_cate_scale8","sparse_embedding_cate"],
# top_names=["sparse_embedding_cate_scale"],
# )
# )
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.FusedReshapeConcat,
bottom_names=["sparse_embedding_seq", "sparse_embedding_cate"],
top_names=["FusedReshapeConcat_item_his_em", "FusedReshapeConcat_item"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Scale,
bottom_names=["FusedReshapeConcat_item"],
top_names=["Scale_item"],
axis=1,
factor=len(seq_array)-1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Sub,
bottom_names=["Scale_item", "FusedReshapeConcat_item_his_em"],
top_names=["sub_ih"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.ElementwiseMultiply,
bottom_names=["Scale_item", "FusedReshapeConcat_item_his_em"],
top_names=["ElementwiseMul_i"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Concat,
bottom_names=["Scale_item", "FusedReshapeConcat_item_his_em", "sub_ih", "ElementwiseMul_i"],
top_names=["concat_i_h"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["concat_i_h"],
top_names=["fc_att_i2"],
num_output=40,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["fc_att_i2"],
top_names=["fc_att_i3"],
num_output=1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["fc_att_i3"],
top_names=["reshape_score"],
leading_dim=len(seq_array)-1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Softmax,
bottom_names=["reshape_score"],
top_names=["softmax_att_i"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Scale,
bottom_names=["softmax_att_i"],
top_names=["Scale_i"],
axis=0,
factor=args.embedding_vec_size*2,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["FusedReshapeConcat_item_his_em"],
top_names=["reshape_item_his"],
leading_dim=args.embedding_vec_size*2*(len(seq_array)-1),
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.ElementwiseMultiply, # matmul
bottom_names=["Scale_i", "reshape_item_his"],
top_names=["ElementwiseMul_ih"],
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.ReduceSum,
bottom_names=["ElementwiseMul_ih"],
top_names=["reduce_ih"],
axis=1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["FusedReshapeConcat_item_his_em"],
top_names=["reshape_his"],
leading_dim=args.embedding_vec_size*2,
time_step=len(seq_array)-1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.ReduceMean,
bottom_names=["reshape_his"],
top_names=["reduce_item_his"],
axis=1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["reduce_item_his"],
top_names=["reshape_reduce_item_his"],
leading_dim=args.embedding_vec_size*2,
)
)
#------------------------------------------------------------------------------------------
# deep层
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["sparse_embedding_data_combine"],
top_names=["reshape_user"],
leading_dim=args.embedding_vec_size*len(t1),
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Reshape,
bottom_names=["sparse_embedding_data2"],
top_names=["reshape_item"],
leading_dim=args.embedding_vec_size*(len(t2)),
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Concat,
bottom_names=[
"reshape_user",
"reshape_item",
"reshape_reduce_item_his",
"reduce_ih",
"FusedReshapeConcat_item",
],
top_names=["concat_din_i"],
)
)
# build_fcn_net
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["concat_din_i"],
top_names=["fc_din_i1"],
num_output=200,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.PReLU_Dice,
bottom_names=["fc_din_i1"],
top_names=["dice_1"],
elu_alpha=0.2,
eps=1e-8,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["dice_1"],
top_names=["fc_din_i2"],
num_output=80,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.PReLU_Dice,
bottom_names=["fc_din_i2"],
top_names=["dice_2"],
elu_alpha=0.2,
eps=1e-8,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["dice_2"],
top_names=["fc3"],
num_output=1,
)
)
model.add(
hugectr.DenseLayer(
layer_type=hugectr.Layer_t.BinaryCrossEntropyLoss,
bottom_names=["fc3", "label"],
top_names=["loss"],
)
)
#------------------------------------------------------------------------------------------
# train
model.compile()
model.summary()
model.graph_to_json(graph_config_file = "/root/" + args.model_name + ".json")
model.fit(num_epochs = 1, display = 500, eval_interval = 100)
train_time = time.time() - train_time
logger.info(f"模型训练耗时: {train_time}s")
save_time = time.time()
model.save_params_to_files(f"/root/{args.model_name}")
save_time = time.time() - save_time
logger.info(f"保存模型耗时: {save_time}s")
print("==================================================模型: {}, 耗时汇总===================================================".format(model_name))
# logger.info("下载数据耗时: {:.2f}s".format(download_time))
logger.info("json 文件处理耗时: {:.2f}s".format(json_time))
# logger.info("训练数据处理耗时: {:.2f}s".format(key_time))
logger.info("模型训练耗时: {:.2f}s".format(train_time))
logger.info("模型保存耗时: {:.2f}s".format(save_time))
# logger.info("总耗时: {:.2f}s".format(download_time+json_time+key_time+train_time+save_time))
program_end = datetime.datetime.now() # 程序结束的时间
logger.info("Program end :{}".format(program_end))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment