Skip to content

Instantly share code, notes, and snippets.

@ugovaretto
Created October 15, 2020 07:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ugovaretto/f606086f2de11c8625ca140ca2465e50 to your computer and use it in GitHub Desktop.
Save ugovaretto/f606086f2de11c8625ca140ca2465e50 to your computer and use it in GitHub Desktop.
Parallel copy
/*******************************************************************************
* BSD 3-Clause License
*
* Copyright (c) 2020, Ugo Varetto
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions inputFile source code must retain the above copyright
*notice, this list inputFile conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list inputFile conditions and the following disclaimer in the
*documentation and/or other materials provided with the distribution.
*
* 3. Neither the name inputFile the copyright holder nor the names inputFile
*its contributors may be used to endorse or promote products derived from this
*software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
*ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
*LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
*CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
*SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
*INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
*CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
*ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
*POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
// parallel copy: intended for parallel filesystems, usually way faster on
// non-parallel filesystems as well
//
// local SSD on six-core laptop (60MB file):
//
// time ./pwcp afile anotherfile2 1 (one thread)
// real 0m0.096s
//
// time ./pwcp afile anotherfile2 4 (four threads)
// real 0m0.051s
//
// local SSD on six-core laptop (1GB file):
//
// time ./pwcp alargefile alargefile_copy 6 (six threads)
// real 0m0.441s
//
// time ./pwcp alargefile alargefile_copy 1 (one thread)
// real 0m1.418s
#include <cstring>
#include <future>
#include <iostream>
#include <thread>
#include <vector>
#if __cplusplus < 201703L
#error "C++17 standard required" //filesystem
#endif
#ifdef __GNUC__
#if __GNUC__ > 8 //do not want to require stdc++fs
#include <filesystem>
#else
#include <sys/stat.h>
#include <sys/types.h>
#endif
#else
#include <filesystem>
#endif
size_t FileSize(const char* filename) {
#ifdef __GNUC__
#if __GNUC__ > 8
return std::filesystem::file_size(filename);
#else
struct stat st;
if (stat(filename, &st) == 0) return st.st_size;
return 0;
#endif
#else
return std::filesystem::file_size(filename);
#endif
}
using namespace std;
//------------------------------------------------------------------------------
void PError(const char* msg) {
perror(msg);
exit(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
void CopyData(const char* ifname, const char* ofname, size_t offset,
size_t size) {
thread_local static FILE* ifile = fopen(ifname, "rb");
if (!ifile) PError("Error opening input file");
if(fseek(ifile, offset, SEEK_SET)) PError("Error moving file pointer");
thread_local static FILE* ofile = fopen(ofname, "wb");
if (!ofile) PError("Error opening output file");
if(fseek(ofile, offset, SEEK_SET)) PError("Error moving file pointer");
thread_local static vector<char> data(size);
const size_t rd = fread(data.data(), data.size(), 1, ifile);
if (rd != 1) PError("Error reading data");
const size_t wd = fwrite(data.data(), data.size(), 1, ofile);
if (wd != 1) PError("Error writing data");
fclose(ifile);
fclose(ofile);
}
//------------------------------------------------------------------------------
int main(int argc, char const* argv[]) {
if (argc < 3 || argc > 4) {
cerr << "Usage: " << argv[0]
<< " <infile> <outfile> [num jobs: default = number of cores]>"
<< endl;
return 1;
}
const char* ifname = argv[1];
const char* ofname = argv[2];
const uint jobs =
argc == 4 ? atoi(argv[3]) : thread::hardware_concurrency();
if (jobs == 0) {
cerr << "Wrong number of jobs specified" << endl;
exit(EXIT_FAILURE);
}
const size_t fileSize = FileSize(ifname);
if(!fileSize) PError("Error retrieving input file size");
// create ofname file
FILE* fp = fopen(ofname, "wb");
if (!fp) {
cerr << "Error creating output file " << ofname << endl;
exit(EXIT_FAILURE);
}
fseek(fp, fileSize, SEEK_SET);
fputc('\0', fp);
fclose(fp);
// compute chunk size
const size_t chunkSize = fileSize / jobs;
// compute last chunk size
const size_t lastChunkSize =
fileSize % jobs == 0 ? chunkSize : fileSize % jobs;
vector<future<void>> f(jobs);
// copy separate chunks with separate threads
for (int j = 0; j != jobs; ++j) {
if (j != jobs - 1) {
f[j] = async(launch::async, CopyData, ifname, ofname, chunkSize * j,
chunkSize);
} else {
f[j] = async(launch::async, CopyData, ifname, ofname, chunkSize * j,
lastChunkSize);
}
}
// wait for completion
for (auto& i : f) i.wait();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment