Skip to content

Instantly share code, notes, and snippets.

@anapsix
Created December 15, 2017 17:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anapsix/7be3cc5dca634ccde15b503eee06fcd3 to your computer and use it in GitHub Desktop.
Save anapsix/7be3cc5dca634ccde15b503eee06fcd3 to your computer and use it in GitHub Desktop.
Kinesis Consumer in PHP with AWS SDK
<?php
// if running in Alpine, install the following
// apk -U add php7 php7-mbstring php7-simplexml php7-json php7-phar php7-openssl curl
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
//
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
if (getenv('KINESIS_STREAM')) {
$streamName = getenv('KINESIS_STREAM');
} else {
$streamName = 'my-kinesis-stream';
}
$numberOfRecordsPerBatch = 10;
//require_once 'vendor/autoload.php';
echo "Getting messages from " . $streamName . " in batches of " . $numberOfRecordsPerBatch . "\n";
use Aws\Credentials\CredentialProvider;
use Aws\Kinesis\KinesisClient;
use Aws\Sts\StsClient;
// see provider docs
// http://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Credentials.CredentialProvider.html
$provider = CredentialProvider::memoize(CredentialProvider::chain(
CredentialProvider::ini(),
CredentialProvider::instanceProfile(),
CredentialProvider::assumeRole([
'client' => new StsClient(['region' => 'us-east-1', 'version' => 'latest']),
'assume_role_params' => [
'RoleArn' => 'arn:aws:iam::555555555555:role/Admin',
'RoleSessionName' => 'test_session',
]
])
));
//this works
// $kinesisClient = KinesisClient::factory(array(
// 'credentials' => $provider,
// 'version' => 'latest',
// 'region' => 'us-east-2'
// ));
// see doc for all the connection options
// https://github.com/aws/aws-sdk-php/blob/master/docs/guide/configuration.rst
// this also works
$kinesisClient = new KinesisClient([
'region' => 'us-east-2',
'version' => 'latest',
'credentials' => $provider,
//'debug' => true,
'retries' => 10,
'delay' => 1000,
'synchronous' => true,
'http' => [
'timeout' => 5,
'connect_timeout' => 5,
'verify' => false
]
]);
// and so does this
// $sdk = new \Aws\Sdk();
// $kinesisClient = $sdk->createKinesis(
// [
// // 'region' => 'us-east-1',
// // 'version' => 'latest',
// 'credentials' => $provider
// ]
// );
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$behind = $res->get('MillisBehindLatest');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber]\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
//sleep(1);
} while ($behind>0 && $shardIterator != '');
}
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $count;
echo "Total Messages: " . $count . "\n";
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment