Skip to content

Instantly share code, notes, and snippets.

@piercelamb
Created December 19, 2022 20:25
Show Gist options
  • Save piercelamb/d21f74f5a96656c5fd91d5ba9209380d to your computer and use it in GitHub Desktop.
Save piercelamb/d21f74f5a96656c5fd91d5ba9209380d to your computer and use it in GitHub Desktop.
copy_s3_data_in_parallel
def copy_s3_data_in_parallel(
df: pd.DataFrame,
bucket: str,
raw_training_data_paths: Dict[str, List[str]],
s3_artifact_path: str,
num_processes: int,
process_func: Optional[Callable[[str, List, str, bool], None]]=None,
reload: bool=False):
existing_artifacts_state = get_raw_data_paths(bucket, path_to_filter_for=s3_artifact_path)
split_df = np.array_split(df, num_processes)
processes = []
for idx, split in enumerate(split_df):
process = multiprocessing.Process(
target=add_to_research_experiment,
args=(
split,
bucket,
s3_artifact_path,
raw_training_data_paths,
existing_artifacts_state,
idx + 1,
process_func,
reload
)
)
processes.append(process)
print("Starting " + str(process.name) + " with " + str(len(split)) + " documents")
process.start()
for process in processes:
process.join()
print("Finished " + str(process.name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment