Skip to content

Instantly share code, notes, and snippets.

@HQarroum
Last active October 14, 2018 00:45
Show Gist options
  • Save HQarroum/92d36872600f098eb74a27ebc7ffba5c to your computer and use it in GitHub Desktop.
Save HQarroum/92d36872600f098eb74a27ebc7ffba5c to your computer and use it in GitHub Desktop.
Demonstrates how to publish records to a Kinesis Firehose stream from devices using the C++ AWS SDK.
#include <iostream>
#include <string>
#include <fstream>
#include <streambuf>
#include <chrono>
#include <thread>
#include <condition_variable>
#include <aws/core/Aws.h>
#include <aws/core/utils/Outcome.h>
#include <aws/firehose/FirehoseClient.h>
#include <aws/firehose/model/Record.h>
#include <aws/firehose/model/ListDeliveryStreamsRequest.h>
#include <aws/firehose/model/PutRecordRequest.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include "./args.hpp"
#include "./semaphore.hpp"
/**
* The name of the Kinesis Firehose thread pool.
*/
static const char THREAD_POOL_NAME[] = "kinesis-thread-pool";
/**
* The number of threads which can run concurrently as part of the
* AWS SDK thread pool.
*/
static const uint8_t THREAD_POOL_SIZE = 10;
/**
* Application argument parser.
*/
args::ArgumentParser parser(
"This application is an example aiming at demonstrating how to publish data"
" to a Kinesis Firehose stream using AWS IAM or Cognito credentials."
);
/**
* Available flags declaration.
*/
args::HelpFlag help(parser, "help", "Display this help menu", {'h', "help"});
args::ValueFlag<std::string> stream(parser, "stream-name", "The Kinesis Firehose stream name to push data to.", {"stream-name"});
args::ValueFlag<std::string> data(parser, "data", "The data to push to the stream.", {"data"});
args::ValueFlag<std::string> awsKeyId(parser, "aws-key-id", "The AWS Access Key Id to use for authentication purposes.", {"aws-key-id"});
args::ValueFlag<std::string> awsSecretKey(parser, "aws-secret-key", "The AWS Secret Access Key to use for authentication purposes.", {"aws-secret-key"});
args::ValueFlag<std::string> awsSession(parser, "aws-session", "The AWS session key to use to send data to the stream.", {"aws-session"});
args::ValueFlag<std::string> awsRegion(parser, "region", "The AWS region to use.", {"aws-region"});
args::ValueFlag<int> iteration(parser, "iteration", "Number of iterations of pushing the data.", {"iteration"});
args::Flag enableLogs(parser, "enable-logs", "Whether to enable the AWS SDK logs for debugging purposes.", {"enable-logs"});
/**
* Loads the given file path content in memory and feeds it into
* a Firehose `Record`.
* \return a firehose `Record` filled with the given `data`.
*/
static const Aws::Firehose::Model::Record make_record(const char* data, size_t len) {
// The firehose record to feed.
Aws::Firehose::Model::Record record;
// Casting the raw data into a `ByteBuffer`.
auto buffer = Aws::Utils::ByteBuffer(reinterpret_cast<const unsigned char*>(data), len);
// Feeding the record object.
record.SetData(buffer);
return (record);
}
/**
* Creates a new `PutRecordRequest` object with the given `streamName` and
* the record filled with the content of `data`.
*/
static const Aws::Firehose::Model::PutRecordRequest make_request(const std::string& streamName, const char* data, size_t len) {
Aws::Firehose::Model::PutRecordRequest request;
request.SetDeliveryStreamName(Aws::String(streamName));
request.SetRecord(make_record(data, len));
return (request);
}
/**
* Asynchronously pushes the given data to the given Kinesis Firehose stream.
*/
static void push_data_to_stream(const Aws::Firehose::FirehoseClient& client, const std::string& streamName, const char* data, size_t len, const Aws::Firehose::PutRecordResponseReceivedHandler& callback) {
// Creating the `PutRecord` request out of the given parameters.
auto request = make_request(streamName, data, len);
// Asynchronously initiating a `PutRecord` request.
client.PutRecordAsync(request, callback);
}
/**
* \return an AWS credentials object filled with credentials
* provided by the command-line.
*/
static const Aws::Auth::AWSCredentials get_credentials() {
return (Aws::Auth::AWSCredentials(
Aws::String(args::get(awsKeyId)), Aws::String(args::get(awsSecretKey)), Aws::String(args::get(awsSession))
));
}
/**
* \return a new `FirehoseClient` instance filled by optional Cognito
* credentials, or associated with the local AWS credentials.
*/
static const Aws::Firehose::FirehoseClient get_client(const Aws::Client::ClientConfiguration& config) {
const auto cognito = awsKeyId && awsSecretKey && awsSession;
return (cognito ? Aws::Firehose::FirehoseClient(get_credentials(), config) : Aws::Firehose::FirehoseClient(config));
}
/**
* Application entry point.
*/
int entry_point(int iteration) {
semaphore_t semaphore{iteration};
// Creating the SDK options object.
Aws::SDKOptions options;
// Setting logging options.
if (enableLogs) {
options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Trace;
options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(Aws::Utils::Logging::LogLevel::Trace);
};
}
Aws::InitAPI(options);
{
// Initializing configuration with the specified region.
Aws::Client::ClientConfiguration config;
config.executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(THREAD_POOL_NAME, THREAD_POOL_SIZE);
config.region = awsRegion ? args::get(awsRegion) : Aws::Region::EU_WEST_1;
// Creating the firehose client.
const Aws::Firehose::FirehoseClient client = get_client(config);
// Callback receiving the result of the publish.
auto callback = [&semaphore](
const Aws::Firehose::FirehoseClient* client,
const Aws::Firehose::Model::PutRecordRequest& request,
const Aws::Firehose::Model::PutRecordOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
outcome.IsSuccess() ?
std::cout << "[+] Record successfully pushed to the firehose stream." << std::endl :
std::cerr << "[!] " << outcome.GetError().GetMessage() << std::endl;
// Signaling that the request is done.
semaphore.notify();
};
// Retrieving data as constant string.
const char* buffer = args::get(data).c_str();
// For each specified iteration, we push the data into the stream.
while (iteration--) {
push_data_to_stream(client, args::get(stream), buffer, ::strlen(buffer), callback);
}
// Waiting for the in-flight requests to be done.
semaphore.wait();
}
Aws::ShutdownAPI(options);
return (EXIT_SUCCESS);
}
/**
* Takes its arguments from the command-line.
* The `main` function simulates an event loop by the usage
* of a condition variable which is notified once the request has
* been executed and a response has been received.
*/
int main(int argc, char* argv[])
{
try {
// Parsing cli arguments.
parser.ParseCLI(argc, argv);
if (!stream || !data) {
std::cout << parser;
std::exit(EXIT_FAILURE);
}
return (entry_point(iteration ? args::get(iteration) : 1));
} catch (const args::Help&) {
std::cout << parser;
} catch (const args::ParseError& e) {
std::cerr << e.what() << std::endl;
std::cerr << parser;
}
return (EXIT_FAILURE);
}
#include <mutex>
#include <condition_variable>
/**
* Simple semaphore wrapper class.
*/
class semaphore_t {
std::mutex mutex_;
std::condition_variable condition_;
int count_;
public:
/**
* \constructor
* Initializaes the counter.
*/
semaphore_t(int count = 0) : count_{count} {}
/**
* Notifies one listening thread of a count change.
*/
void notify() {
std::unique_lock<std::mutex> guard(mutex_);
--count_;
condition_.notify_one();
}
/**
* Awaits until the counter is set to zero.
*/
void wait() {
std::unique_lock<std::mutex> guard(mutex_);
while (count_ > 0) {
condition_.wait(guard);
}
}
};
@HQarroum
Copy link
Author

HQarroum commented Oct 14, 2018

Features

  • Uses the AWS SDK thread pool to limit the amount of concurrent requests performed by the device.
  • Allows to authenticate by using either AWS IAM or Cognito credentials.
  • Multiple calls to push_data_to_stream reuses the same underlying TCP/TLS connection initiated by the SDK, thus improving performances.

Dependencies

  • The C++ AWS SDK.
  • Args. A simple header-only C++ argument parser library.

Examples

  1. Using local IAM credentials
./push_records --stream-name=my-firehose-stream --data=`cat data.txt`
  1. Using Cognito credentials
./push_records --stream-name=my-firehose-stream --data=`cat data.txt` --aws-key-id=my-id --aws-secret-key=my-secret-key --aws-session=my-session
  1. Using optional parameters
./push_records --stream-name=my-firehose-stream --data=`cat data.txt` --region=eu-west-1 --iteration=10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment