Skip to content

Instantly share code, notes, and snippets.

@vmrob
Last active July 20, 2020 14:44
Show Gist options
  • Save vmrob/38debbb80b93df6da4fa to your computer and use it in GitHub Desktop.
Save vmrob/38debbb80b93df6da4fa to your computer and use it in GitHub Desktop.
non blocking reader for std::istream. compile with c++14
#include <iostream>
#include <chrono>
#include <thread>
#include <queue>
#include <string>
#include <atomic>
#include <mutex>
class AsyncStreamReader {
public:
// This constructor uses an initializer list for initializing _stream.
// See http://en.cppreference.com/w/cpp/language/initializer_list
AsyncStreamReader(std::istream* stream)
: _stream(stream)
{
// we can't put this in an initializer list because std::atomic_bool
// can't be initialized via constructor.
_eof = false;
// We probably shouldn't initialize thread in the initializer list
// because during the run of the initializer list, the class hasn't
// fully been constructed. We don't want to spin off another thread
// that uses this object while we're still constructing it!
//
// By the time the body of the constructor is executed, we do have a
// valid object, so we can spin off the thread now.
//
// The thread itself is constructed using a lambda function using
// capture by reference. We have to capture `this` by reference because
// we're actually executing a member function of the class.
// See http://en.cppreference.com/w/cpp/language/lambda for info on
// lambdas.
//
// Note that as soon as the thread is constructed, the lambda is run.
// We'll have another thread [probably] running before this constructor
// returns.
_thread = std::thread([&] { threadEntry(); });
}
bool isReady() {
// lock_guard is an excellent helper object. It uses the RAII idiom
// (http://en.cppreference.com/w/cpp/language/raii) in the context of
// mutexes. Basically, it's a guaranteed way to call _mutex.unlock()
// before this function returns--even if _queue.empty() or anything else
// in the function throws.
std::lock_guard<std::mutex> lk(_mutex);
return !_queue.empty();
}
std::string getLine() {
std::lock_guard<std::mutex> lk(_mutex);
if (_queue.empty()) {
// {} is essentially an empty initializer list. It's implicitly
// convertible to std::string and essentially represents an empty
// string. It's the same as saying `return "";` or
// `return std::string()`
return {};
}
// auto is magical. The compiler knows what type results from
// `std::move(_queue.front())` (even if you don't) and will make sure
// the type of line is correct.
//
// Herb Sutter, one of the top dogs in the C++ world basically says
// "Almost Always Auto". Really good read:
// http://herbsutter.com/2013/08/12/gotw-94-solution-aaa-style-almost-always-auto/
//
// std::move is a bit trickier. You'll kind of have to know what rvalue
// references are for this to make sense. Essentially, _queue.front()
// returns a reference to the front of the queue which is a std::string.
// When you assign it like this, you're actually using std::string's
// move assignment operator which works on rvalues. What you need to
// know is that if you excluded the std::move, you would be copying the
// first item in the queue which is somewhat wasteful. By "moving" it,
// you avoid a lot of overhead.
//
// See http://www.bogotobogo.com/cplusplus/C11/5_C11_Move_Semantics_Rvalue_Reference.php
// and http://en.cppreference.com/w/cpp/language/value_category
auto line = std::move(_queue.front());
_queue.pop();
return line;
}
inline bool stillRunning() {
return !_eof || isReady();
}
private:
// nullptr is excellent. Always use it over NULL or "0" when you can.
std::istream* _stream = nullptr;
// std::atomic_bool is a typedef for std::atomic<bool>. std::atomic is a
// class that allows you to perform some atomic operations on variables in a
// thread safe context. For atomic_bool, this essentially amounts to having
// a variable that can be safely read from and written to without using a
// mutex.
std::atomic_bool _eof;
// mutexes are synchronization primitives that I don't think I could explain
// very well in the context of a single comment. You should probably read up
// on concurrency to get a better grasp of these.
// A quick google search indicates there's a lot of info on the topic:
// http://www.cplusplusconcurrencyinaction.com/
std::mutex _mutex;
// just a regular old fifo queue.
std::queue<std::string> _queue;
// threads! http://en.cppreference.com/w/cpp/thread/thread
std::thread _thread;
// The function that we run in a separate thread.
void threadEntry() {
std::string line;
// std::getline returns std::cin which, in a bool context, evaluates to
// false when it can't read anymore.
while (std::getline(std::cin, line)) {
std::lock_guard<std::mutex> lk(_mutex); // keep _queue thread safe!
_queue.push(std::move(line)); // Another usage of std::move
}
// While technically eof probably isn't the only thing that can happen
// when this loop exits, we're essentially just saying that we can't
// read anymore.
_eof = true;
}
};
int main() {
// Instantiate the class and initialize using "universal initialization syntax"
// See http://herbsutter.com/2013/05/09/gotw-1-solution/ for some serious
// details.
//
// We're providing the memory address (by adding & in front of the variable) of
// the global variable std::cin. AsyncStreamReader takes a std::istream* as an
// argument to the constructor and std::cin is exactly that: an std::istream.
//
// Conveniently, because we allow an arbitrary stream to be passed in, you could
// also use an std::fstream or std::stringstream as well.
//
// See http://en.cppreference.com/w/cpp/io/basic_fstream
// and http://en.cppreference.com/w/cpp/io/basic_stringstream
//
// Note that both of those derive from std::istream.
AsyncStreamReader reader{&std::cin};
while (true) {
// Ask the AsyncStreamReader reader if it has input ready for us. This is a
// thread-safe function. In fact, all of AsyncStreamReader is thread safe.
while (reader.isReady()) {
// If there's input, go ahead and grab it and then print it out.
std::cout << "input received: " << reader.getLine() << std::endl;
}
// Check if the reader is running. If it's running, we'll need to continue
// to check for input until it no longer runs. It will "run" until it has
// no more input for us and can't get anymore.
if (reader.stillRunning()) {
std::cout << "still waiting..." << std::endl;
} else {
// eof = end of file. If you were to pipe input into the program via
// unix pipes (cat filename | ./nonblocking), std::cin would receive
// input via the pipe. When the pipe no longer has any input, it sends
// the eof character. AsynStreamReader will "run" until all of the
// input is consumed and it hits eof. If you run it without pipes,
// (such as via the normal console), you'll never reach eof in
// std::cin unless you send it directly via control-D.
std::cout << "eof" << std::endl;
}
// Tell the thread to sleep for a little bit.
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
@SignalWhisperer
Copy link

line 117 should read: while (std::getline(*_stream, line)) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment