Skip to content

Instantly share code, notes, and snippets.

@testillano
Last active July 18, 2022 07:10
Show Gist options
  • Save testillano/eaf0c796966604b12f850e78e70c5d67 to your computer and use it in GitHub Desktop.
Save testillano/eaf0c796966604b12f850e78e70c5d67 to your computer and use it in GitHub Desktop.
SafeFile with close delayed and max files opened control
///////////////////////////////////////////////////////////////
// Safe file: allow safe writting of text/binary files.
// Close delay configurable.
// Max opened files controlled by condition variable.
//
// LINK: g++ main.cc -l pthread -l boost_system -l boost_thread
///////////////////////////////////////////////////////////////
#include <iostream>
#include <thread>
#include <vector>
#include <fstream>
#include <atomic>
#include <mutex>
#include <string>
#include <unistd.h> // sysconf
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class SafeFile {
std::string path_;
std::ofstream file_;
int max_open_files_;
std::mutex mutex_; // write file mutex
bool opened_;
boost::asio::deadline_timer *timer_{};
unsigned int close_delay_us_;
boost::asio::io_service *io_service_{};
void delayedClose() {
if (!timer_) timer_ = new boost::asio::deadline_timer(*io_service_, boost::posix_time::microseconds(close_delay_us_));
timer_->cancel();
timer_->expires_from_now(boost::posix_time::microseconds(close_delay_us_));
timer_->async_wait([this] (const boost::system::error_code& e) {
if( e ) return; // probably, we were cancelled (boost::asio::error::operation_aborted)
close();
});
}
public:
/**
* Constructor
*
* @param timersIoService asio io service which will be used to delay close
* operations with the intention to reduce overhead in some scenarios.
* @param path file path to write. It could be relative (to execution path) or absolute.
* @param closeDelayUs delay after last write operation, to close the file. By default
* it is configured to 1 second, something appropiate to log over long term files.
* Zero value means that no planned close is scheduled, so the file is opened,
* written and closed in the same moment. This could be interesting for write
* many different files when they are not rewritten. If they need to append
* data later, use @setCloseDelay to configure them as short term files
* taking into account the maximum number of files that your system could
* open simultaneously. This class will blocks new open operations when that
* limit is reached, to prevent file system errors.
* @param mode open mode. By default, text files and append is selected. You
* could anyway add other flags, for example for binary dumps: std::ios::binary
*/
SafeFile (boost::asio::io_service *timersIoService,
const std::string& path,
unsigned int closeDelayUs = 1000000 /* 1 second */,
std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app): io_service_(timersIoService),
path_(path),
close_delay_us_(closeDelayUs),
opened_(false),
timer_(nullptr)
{
max_open_files_ = sysconf(_SC_OPEN_MAX /* 1024 probably */) - 10 /* margin just in case the process open other files */;
open(mode);
}
~SafeFile() { close(); delete timer_; }
// Opened files control:
static std::atomic<int> CurrentOpenedFiles;
static std::mutex MutexOpenedFiles;
static std::condition_variable OpenedFilesCV;
/**
* Open the file for writting
*
* @param mode open mode. By default, text files and append is selected. You
* could anyway add other flags, for example for binary dumps: std::ios::binary
*/
void open(std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app) {
std::unique_lock<std::mutex> lock(MutexOpenedFiles);
if (opened_) return;
//Wait until we have data or a quit signal
OpenedFilesCV.wait(lock, [this]
{
return (CurrentOpenedFiles.load() < max_open_files_);
});
// After wait, we own the lock
file_.open(path_, mode);
if (file_.is_open()) {
opened_ = true;
CurrentOpenedFiles++;
std::cout << "Opened file " << path_ << '\n';
}
lock.unlock();
}
/**
* Close the file
*/
void close() {
std::unique_lock<std::mutex> lock(MutexOpenedFiles);
if (!opened_) return;
file_.close();
opened_ = false;
CurrentOpenedFiles--;
std::cout << "Closed file " << path_ << '\n';
lock.unlock();
OpenedFilesCV.notify_one();
}
/**
* Empty the file
*/
void empty() {
open(std::ofstream::out | std::ofstream::trunc);
close();
}
/**
* Set the delay in microseconds to close an opened file after writting over it.
*
* A value of zero will disable the delayed procedure, and close will be done
* after write operation (instant close).
*
* The class constructor sets 1 second by default, appropiate for logging
* oriented files which represent the most common and reasonable usage.
*
* We could consider 'short term files' those which are going to be rewritten
* (like long term ones) but when there are many of them and maximum opened
* files limit is a factor to take into account. So if your application is
* writting many different files, to optimize for example a load traffic rate
* of 200k req/s with a limit of 1024 concurrent files, we need a maximum
* delay of 1024/200000 = 0,00512 = 5 msecs.
*
* @param usecs microseconds of delay. Zero disables planned close and will be done instantly.
*/
void setCloseDelayUs(unsigned int usecs) {
close_delay_us_ = usecs;
}
/** Class string representation */
std::string asString() {
std::string result = "SafeFile | path: ";
result += path_;
result += " | size (bytes): ";
std::ifstream file( path_, std::ofstream::in | std::ios::ate | std::ios::binary); // valid also for text files
result += std::to_string(file.tellg());
file.close();
result += " | state: ";
result += (opened_ ? "opened":"closed");
return result;
}
/**
* Write data to the file.
* Close could be delayed.
*
* @param data data to write
* @see setCloseDelay()
*/
void write (const std::string& data) {
// Open file (lazy):
open();
// Write file:
std::lock_guard<std::mutex> lock(mutex_);
file_.write(data.c_str(), data.size());
// Close file:
if (close_delay_us_ != 0) {
delayedClose();
}
else {
close();
}
}
};
std::atomic<int> SafeFile::CurrentOpenedFiles(0);
std::mutex SafeFile::MutexOpenedFiles;
std::condition_variable SafeFile::OpenedFilesCV;
int main () {
auto timersIoService = new boost::asio::io_service();
std::thread tt([&] {
boost::asio::io_service::work work(*timersIoService);
timersIoService->run();
});
std::cout << "Timers io context running ..." << '\n';
std::vector<std::thread> threads;
int nthreads = 10;
for (int n=0; n < nthreads; n++) {
threads.push_back(std::thread ([&tt, n, timersIoService]() {
auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n)); // long term file: 1 second by default
//auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n), 5000); // short term file: less overhead, more risk to reach opened files limit
//auto myFile = std::make_shared<SafeFile>(timersIoService, std::string("file.txt.") + std::to_string(n), 0); // instant file: atomic open/write/close: overhead
for(int k=0; k<500; k++) myFile->write(std::string("Hi from: ") + myFile->asString() + "\n");
std::cout << "Waiting possible close operation ..." << '\n';
sleep(2);
}));
}
for (int n=0; n < nthreads; n++) {
threads[n].join();
}
boost::asio::deadline_timer exitTimer(*timersIoService, boost::posix_time::milliseconds(100));
exitTimer.async_wait([&] (const boost::system::error_code& e) { timersIoService->stop(); std::cout << "Exiting ..." << '\n'; });
tt.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment