from mpi4py import MPI import pandas as pd import matplotlib.pyplot as plt import numpy as np import time import sys import json if __name__ == '__main__': comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() file_list = ['wls_day-07_5000000aa', 'wls_day-07_5000000ab', 'wls_day-07_5000000ac', 'wls_day-07_5000000ad', 'wls_day-07_5000000ae', 'wls_day-07_5000000af', 'wls_day-07_5000000ag', 'wls_day-07_5000000ah', 'wls_day-07_5000000ai', 'wls_day-07_5000000aj'] #file_listの要素数だけプロセスを発生するようにする(今回は10) #(1)各プロセスでrank数のfile_listを取り込み json_file = open(file_list[rank]) df_jsonl = pd.read_json(json_file, orient='records', lines=True) #(2)各プロセスから取り込んだ内容を集約して、一つのデータフレームにする df_all = None #データフレームの結合 if rank == 0: df_all = df_jsonl #自分のデータフレームを代入 for i in range(1, size): #他プロセスのデータフレームを受け取り結合 df_rank = comm.recv(source=i, tag=11) df_all = pd.concat([df_all, df_rank], ignore_index=True) print('結合完了') else : comm.send(df_jsonl, dest=0, tag=11) if rank == 0: df_all.to_csv('./wls_day-07_all.csv', index=False) #mpiexec -n 10 python3 HostEvents_CSV_MPI.py