Last active
October 14, 2018 00:45
-
-
Save HQarroum/92d36872600f098eb74a27ebc7ffba5c to your computer and use it in GitHub Desktop.
Demonstrates how to publish records to a Kinesis Firehose stream from devices using the C++ AWS SDK.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <string> | |
#include <fstream> | |
#include <streambuf> | |
#include <chrono> | |
#include <thread> | |
#include <condition_variable> | |
#include <aws/core/Aws.h> | |
#include <aws/core/utils/Outcome.h> | |
#include <aws/firehose/FirehoseClient.h> | |
#include <aws/firehose/model/Record.h> | |
#include <aws/firehose/model/ListDeliveryStreamsRequest.h> | |
#include <aws/firehose/model/PutRecordRequest.h> | |
#include <aws/core/utils/threading/Executor.h> | |
#include <aws/core/auth/AWSCredentialsProvider.h> | |
#include <aws/core/utils/logging/ConsoleLogSystem.h> | |
#include "./args.hpp" | |
#include "./semaphore.hpp" | |
/** | |
* The name of the Kinesis Firehose thread pool. | |
*/ | |
static const char THREAD_POOL_NAME[] = "kinesis-thread-pool"; | |
/** | |
* The number of threads which can run concurrently as part of the | |
* AWS SDK thread pool. | |
*/ | |
static const uint8_t THREAD_POOL_SIZE = 10; | |
/** | |
* Application argument parser. | |
*/ | |
args::ArgumentParser parser( | |
"This application is an example aiming at demonstrating how to publish data" | |
" to a Kinesis Firehose stream using AWS IAM or Cognito credentials." | |
); | |
/** | |
* Available flags declaration. | |
*/ | |
args::HelpFlag help(parser, "help", "Display this help menu", {'h', "help"}); | |
args::ValueFlag<std::string> stream(parser, "stream-name", "The Kinesis Firehose stream name to push data to.", {"stream-name"}); | |
args::ValueFlag<std::string> data(parser, "data", "The data to push to the stream.", {"data"}); | |
args::ValueFlag<std::string> awsKeyId(parser, "aws-key-id", "The AWS Access Key Id to use for authentication purposes.", {"aws-key-id"}); | |
args::ValueFlag<std::string> awsSecretKey(parser, "aws-secret-key", "The AWS Secret Access Key to use for authentication purposes.", {"aws-secret-key"}); | |
args::ValueFlag<std::string> awsSession(parser, "aws-session", "The AWS session key to use to send data to the stream.", {"aws-session"}); | |
args::ValueFlag<std::string> awsRegion(parser, "region", "The AWS region to use.", {"aws-region"}); | |
args::ValueFlag<int> iteration(parser, "iteration", "Number of iterations of pushing the data.", {"iteration"}); | |
args::Flag enableLogs(parser, "enable-logs", "Whether to enable the AWS SDK logs for debugging purposes.", {"enable-logs"}); | |
/** | |
* Loads the given file path content in memory and feeds it into | |
* a Firehose `Record`. | |
* \return a firehose `Record` filled with the given `data`. | |
*/ | |
static const Aws::Firehose::Model::Record make_record(const char* data, size_t len) { | |
// The firehose record to feed. | |
Aws::Firehose::Model::Record record; | |
// Casting the raw data into a `ByteBuffer`. | |
auto buffer = Aws::Utils::ByteBuffer(reinterpret_cast<const unsigned char*>(data), len); | |
// Feeding the record object. | |
record.SetData(buffer); | |
return (record); | |
} | |
/** | |
* Creates a new `PutRecordRequest` object with the given `streamName` and | |
* the record filled with the content of `data`. | |
*/ | |
static const Aws::Firehose::Model::PutRecordRequest make_request(const std::string& streamName, const char* data, size_t len) { | |
Aws::Firehose::Model::PutRecordRequest request; | |
request.SetDeliveryStreamName(Aws::String(streamName)); | |
request.SetRecord(make_record(data, len)); | |
return (request); | |
} | |
/** | |
* Asynchronously pushes the given data to the given Kinesis Firehose stream. | |
*/ | |
static void push_data_to_stream(const Aws::Firehose::FirehoseClient& client, const std::string& streamName, const char* data, size_t len, const Aws::Firehose::PutRecordResponseReceivedHandler& callback) { | |
// Creating the `PutRecord` request out of the given parameters. | |
auto request = make_request(streamName, data, len); | |
// Asynchronously initiating a `PutRecord` request. | |
client.PutRecordAsync(request, callback); | |
} | |
/** | |
* \return an AWS credentials object filled with credentials | |
* provided by the command-line. | |
*/ | |
static const Aws::Auth::AWSCredentials get_credentials() { | |
return (Aws::Auth::AWSCredentials( | |
Aws::String(args::get(awsKeyId)), Aws::String(args::get(awsSecretKey)), Aws::String(args::get(awsSession)) | |
)); | |
} | |
/** | |
* \return a new `FirehoseClient` instance filled by optional Cognito | |
* credentials, or associated with the local AWS credentials. | |
*/ | |
static const Aws::Firehose::FirehoseClient get_client(const Aws::Client::ClientConfiguration& config) { | |
const auto cognito = awsKeyId && awsSecretKey && awsSession; | |
return (cognito ? Aws::Firehose::FirehoseClient(get_credentials(), config) : Aws::Firehose::FirehoseClient(config)); | |
} | |
/** | |
* Application entry point. | |
*/ | |
int entry_point(int iteration) { | |
semaphore_t semaphore{iteration}; | |
// Creating the SDK options object. | |
Aws::SDKOptions options; | |
// Setting logging options. | |
if (enableLogs) { | |
options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Trace; | |
options.loggingOptions.logger_create_fn = [] { | |
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(Aws::Utils::Logging::LogLevel::Trace); | |
}; | |
} | |
Aws::InitAPI(options); | |
{ | |
// Initializing configuration with the specified region. | |
Aws::Client::ClientConfiguration config; | |
config.executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(THREAD_POOL_NAME, THREAD_POOL_SIZE); | |
config.region = awsRegion ? args::get(awsRegion) : Aws::Region::EU_WEST_1; | |
// Creating the firehose client. | |
const Aws::Firehose::FirehoseClient client = get_client(config); | |
// Callback receiving the result of the publish. | |
auto callback = [&semaphore]( | |
const Aws::Firehose::FirehoseClient* client, | |
const Aws::Firehose::Model::PutRecordRequest& request, | |
const Aws::Firehose::Model::PutRecordOutcome& outcome, | |
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { | |
outcome.IsSuccess() ? | |
std::cout << "[+] Record successfully pushed to the firehose stream." << std::endl : | |
std::cerr << "[!] " << outcome.GetError().GetMessage() << std::endl; | |
// Signaling that the request is done. | |
semaphore.notify(); | |
}; | |
// Retrieving data as constant string. | |
const char* buffer = args::get(data).c_str(); | |
// For each specified iteration, we push the data into the stream. | |
while (iteration--) { | |
push_data_to_stream(client, args::get(stream), buffer, ::strlen(buffer), callback); | |
} | |
// Waiting for the in-flight requests to be done. | |
semaphore.wait(); | |
} | |
Aws::ShutdownAPI(options); | |
return (EXIT_SUCCESS); | |
} | |
/** | |
* Takes its arguments from the command-line. | |
* The `main` function simulates an event loop by the usage | |
* of a condition variable which is notified once the request has | |
* been executed and a response has been received. | |
*/ | |
int main(int argc, char* argv[]) | |
{ | |
try { | |
// Parsing cli arguments. | |
parser.ParseCLI(argc, argv); | |
if (!stream || !data) { | |
std::cout << parser; | |
std::exit(EXIT_FAILURE); | |
} | |
return (entry_point(iteration ? args::get(iteration) : 1)); | |
} catch (const args::Help&) { | |
std::cout << parser; | |
} catch (const args::ParseError& e) { | |
std::cerr << e.what() << std::endl; | |
std::cerr << parser; | |
} | |
return (EXIT_FAILURE); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <mutex> | |
#include <condition_variable> | |
/** | |
* Simple semaphore wrapper class. | |
*/ | |
class semaphore_t { | |
std::mutex mutex_; | |
std::condition_variable condition_; | |
int count_; | |
public: | |
/** | |
* \constructor | |
* Initializaes the counter. | |
*/ | |
semaphore_t(int count = 0) : count_{count} {} | |
/** | |
* Notifies one listening thread of a count change. | |
*/ | |
void notify() { | |
std::unique_lock<std::mutex> guard(mutex_); | |
--count_; | |
condition_.notify_one(); | |
} | |
/** | |
* Awaits until the counter is set to zero. | |
*/ | |
void wait() { | |
std::unique_lock<std::mutex> guard(mutex_); | |
while (count_ > 0) { | |
condition_.wait(guard); | |
} | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Features
push_data_to_stream
reuses the same underlying TCP/TLS connection initiated by the SDK, thus improving performances.Dependencies
Examples
./push_records --stream-name=my-firehose-stream --data=`cat data.txt`
./push_records --stream-name=my-firehose-stream --data=`cat data.txt` --aws-key-id=my-id --aws-secret-key=my-secret-key --aws-session=my-session
./push_records --stream-name=my-firehose-stream --data=`cat data.txt` --region=eu-west-1 --iteration=10