Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AWS Kinesis Example for PHP (using the AWS SDK for PHP)
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$numberOfRecordsPerBatch = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber] $item\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
sleep(1);
} while ($localCount>0);
}
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $count;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$totalNumberOfRecords = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
/**
* Simple buffer that batches messages before passing them to a callback
*/
class Buffer {
protected $callback;
protected $size;
protected $data = [];
public function __construct($callback, $size=500) {
$this->callback = $callback;
$this->size = $size;
}
public function add($item) {
$this->data[] = $item;
if (count($this->data) >= $this->size) {
$this->flush();
}
}
public function reset() {
$this->data = [];
}
public function flush() {
if (count($this->data) > 0) {
call_user_func($this->callback, $this->data);
$this->reset();
}
}
}
$buffer = new Buffer(function(array $data) use ($kinesisClient, $streamName) {
echo "Flushing\n";
$parameter = [ 'StreamName' => $streamName, 'Records' => []];
foreach ($data as $item) {
$parameter['Records'][] = [
'Data' => $item,
'PartitionKey' => md5($item)
];
}
$res = $kinesisClient->putRecords($parameter);
echo "Failed records: {$res->get('FailedRecordCount')}\n";
});
$startTime = microtime(true);
for ($i=0; $i<$totalNumberOfRecords; $i++) {
$buffer->add(json_encode([
'id' => rand(0, 10000),
'title' => 'Foo'
]));
}
$buffer->flush();
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $totalNumberOfRecords;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
@ebylund

This comment has been minimized.

Copy link

@ebylund ebylund commented Jan 12, 2017

Thanks for this sample code. It saved me some time not having to write this from scratch just to play with kinesis. I had one suggestion as a fix or another option on how this should work. On the consumer code, you exit the while loop when the record count isn't greater than 0. There could be potentially more data in the stream, but the loop is terminated too early. You might not find this to be the case if the stream has ample data, but a stream with gaps in the data will break out (likely to have gaps if your looking at this code). My suggested fix is simple to change the while condition to be while ($millisBehindLatest > 0). This will allow the iterator to go from TRIM_HORIZON to LATEST and make sure to process all records in the stream.

@bachloxo

This comment has been minimized.

Copy link

@bachloxo bachloxo commented Aug 6, 2018

Thanks so much

@harshini66

This comment has been minimized.

Copy link

@harshini66 harshini66 commented Jan 30, 2019

Thanks for the sample code which works well..

@filipetomita

This comment has been minimized.

Copy link

@filipetomita filipetomita commented Nov 6, 2019

Thanks for this sample code!

@mangelsnc

This comment has been minimized.

Copy link

@mangelsnc mangelsnc commented Oct 1, 2020

Hi! I'm new in Kinesis and I think I am missing something.

I usually work with queues systems as RabbitMQ and others based on AMQP, theres any way in Kinesis to mark a record as “processed”? I mean, how can I remove a record from the stream?

I’m just testing using Kinesis as a kind of queue system (a client requirement for a project).
I created a consumer and a producer pretty similars to yours, but I want to put some code after the line 46 of your consumer which tells Kinesis that this record is processed and should not send it anymore (In AMQP this is usually an ACK or a NACK).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.