Skip to content

Instantly share code, notes, and snippets.

@ericek111
Created September 16, 2022 22:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericek111/b356758a9bc41bb16042b02c78d16395 to your computer and use it in GitHub Desktop.
Save ericek111/b356758a9bc41bb16042b02c78d16395 to your computer and use it in GitHub Desktop.
use inotify to watch a directory and process content written to a file from another process
#include <iostream>
#include <filesystem>
#include <chrono>
#include <thread>
#include <vector>
#include <map>
#include <condition_variable>
#include <cerrno>
#include <cstring>
#include <poll.h>
#include <csignal>
#include <sys/inotify.h>
#include <sys/ioctl.h>
#include <opus.h>
typedef struct WavHeader
{
/* RIFF Chunk Descriptor */
uint8_t RIFF[4]; // RIFF Header Magic header
uint32_t ChunkSize; // RIFF Chunk Size
uint8_t WAVE[4]; // WAVE Header
/* "fmt" sub-chunk */
uint8_t fmt[4]; // FMT header
uint32_t Subchunk1Size; // Size of the fmt chunk
uint16_t AudioFormat; // Audio format 1=PCM,6=mulaw,7=alaw, 257=IBM Mu-Law, 258=IBM A-Law, 259=ADPCM
uint16_t NumOfChan; // Number of channels 1=Mono 2=Sterio
uint32_t SamplesPerSec; // Sampling Frequency in Hz
uint32_t bytesPerSec; // bytes per second
uint16_t blockAlign; // 2=16-bit mono, 4=16-bit stereo
uint16_t bitsPerSample; // Number of bits per sample
/* "data" sub-chunk */
uint8_t Subchunk2ID[4]; // "data" string
uint32_t Subchunk2Size; // Sampled data length
} WavHeader;
typedef struct InotifyCompressJob {
std::string filePath;
std::thread th;
std::condition_variable cv;
std::mutex mtx;
std::atomic<bool> stillOpen = true;
} InotifyCompressJob;
bool g_shouldRun = true;
void handleNewFile(std::shared_ptr<InotifyCompressJob> job) {
std::cout << "Started new thread for " << job->filePath << std::endl;
FILE* file = fopen(job->filePath.c_str(), "r");
if (file == nullptr) {
std::cerr << "Can't open new file '" << job->filePath << "': " << std::strerror(errno) << std::endl;
return;
}
int fd = fileno(file);
int lastBytesAvailable = 0;
int bytesAvailable = 0;
size_t readSoFar = 0;
bool hasHeader = false;
WavHeader inHeader;
std::vector<std::byte> inBuf;
std::unique_lock<std::mutex> lck(job->mtx);
while (g_shouldRun && job->stillOpen) {
job->cv.wait_for(lck, std::chrono::milliseconds(500));
if (ioctl(fd, FIONREAD, &bytesAvailable) == -1) {
std::cerr << "ioctl error on FIONREAD for '" << job->filePath << "': " << std::strerror(errno) << std::endl;
continue;
}
if (lastBytesAvailable == bytesAvailable) {
continue;
}
if (!hasHeader) {
if (bytesAvailable < sizeof(inHeader)) {
continue;
}
ssize_t readNow = read(fd, &inHeader, sizeof(inHeader));
if (readNow == -1) {
std::cout << "Failed to read the WAV header of '" << job->filePath << "'" << std::endl;
continue;
}
readSoFar += readNow;
inBuf.reserve(inHeader.bytesPerSec);
hasHeader = true;
}
if (bytesAvailable < inBuf.capacity()) {
// std::cout << "not enough data yet " << bytesAvailable << " / " << inBuf.capacity() << "\n";
continue;
}
ssize_t readNow = read(fd, inBuf.data(), inBuf.capacity());
if (readNow == -1) {
std::cout << "Failed to read " << inBuf.capacity() << " B from '" << job->filePath << "'" << std::endl;
continue;
}
readSoFar += readNow;
std::cout << "Got " << readNow << " / " << bytesAvailable << " in " << job->filePath << std::endl;
lastBytesAvailable = bytesAvailable;
}
// TODO: Process the rest of the buffer.
std::cout << "Closed with " << readSoFar << " B read: " << job->filePath << std::endl;
fclose(file);
}
int main(int argc, char** argv) {
if (argc < 2) {
std::cerr << "Usage: ./inotifycompress (path to directory with recordings)" << std::endl;
return 1;
}
std::filesystem::path path(argv[1]);
if (!std::filesystem::is_directory(path)) {
std::cerr << "'" << path << "' is not a directory or does not exist!" << std::endl;
return 2;
}
int fd = inotify_init();
if (fd == -1) {
std::cerr << "Failed to inotify_init: " << std::strerror(errno) << std::endl;
return 3;
}
signal(SIGINT, [](int signum) {
g_shouldRun = false;
});
signal(SIGABRT, [](int signum) {
g_shouldRun = false;
});
std::map<std::string, std::shared_ptr<InotifyCompressJob>> threadMap;
int wd = inotify_add_watch(fd, path.c_str(), IN_CREATE | IN_MODIFY | IN_CLOSE_WRITE);
std::vector<std::byte> eventBuf;
eventBuf.reserve(64 * (sizeof (struct inotify_event)) + NAME_MAX + 1);
ssize_t readLen;
struct pollfd pfd = { fd, POLLIN, 0 };
while (g_shouldRun) {
int ret = poll(&pfd, 1, 100); // timeout of 100ms
if (ret == -1) {
if (errno == EINTR) {
break;
}
std::cerr << "poll failed: " << std::strerror(errno) << std::endl;
return 7;
} else if (ret == 0) {
continue;
}
readLen = read(fd, eventBuf.data(), eventBuf.capacity());
if (readLen == 0) {
std::cerr << "read from inotify returned 0!" << std::endl;
return 4;
} else if (readLen == -1) {
std::cerr << "Failed to read from inotify: " << std::strerror(errno) << std::endl;
return 5;
}
for (std::byte* cursor = eventBuf.data(); cursor < eventBuf.data() + readLen; ) {
auto event = (struct inotify_event*) cursor;
cursor += sizeof(struct inotify_event) + event->len;
std::string fileName{event->name};
if (!fileName.starts_with("audio_") || !fileName.ends_with(".wav"))
continue;
if (!threadMap.contains(fileName)) {
auto entry = std::make_shared<InotifyCompressJob>();
entry->filePath = std::string{path} + std::filesystem::path::preferred_separator + fileName;
entry->th = std::thread{handleNewFile, entry};
threadMap[fileName] = entry;
} else {
auto& entry = threadMap[fileName];
if (event->mask & IN_CLOSE_WRITE) {
entry->stillOpen = false;
entry->th.detach(); // TODO: Make this better, destructor is called next.
threadMap.erase(fileName);
}
entry->cv.notify_all();
}
}
}
g_shouldRun = false; // for other threads and housekeeping
inotify_rm_watch(fd, wd);
close(fd);
for (auto& [fileName, entry] : threadMap) {
if (entry->th.joinable()) {
entry->th.join();
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment