Skip to content

Instantly share code, notes, and snippets.

@fbrnc
Last active February 13, 2023 11:25
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save fbrnc/3303c8b6ae28626a2953 to your computer and use it in GitHub Desktop.
Save fbrnc/3303c8b6ae28626a2953 to your computer and use it in GitHub Desktop.
AWS Kinesis Example for PHP (using the AWS SDK for PHP)
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$numberOfRecordsPerBatch = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber] $item\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
sleep(1);
} while ($localCount>0);
}
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $count;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$totalNumberOfRecords = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
/**
* Simple buffer that batches messages before passing them to a callback
*/
class Buffer {
protected $callback;
protected $size;
protected $data = [];
public function __construct($callback, $size=500) {
$this->callback = $callback;
$this->size = $size;
}
public function add($item) {
$this->data[] = $item;
if (count($this->data) >= $this->size) {
$this->flush();
}
}
public function reset() {
$this->data = [];
}
public function flush() {
if (count($this->data) > 0) {
call_user_func($this->callback, $this->data);
$this->reset();
}
}
}
$buffer = new Buffer(function(array $data) use ($kinesisClient, $streamName) {
echo "Flushing\n";
$parameter = [ 'StreamName' => $streamName, 'Records' => []];
foreach ($data as $item) {
$parameter['Records'][] = [
'Data' => $item,
'PartitionKey' => md5($item)
];
}
$res = $kinesisClient->putRecords($parameter);
echo "Failed records: {$res->get('FailedRecordCount')}\n";
});
$startTime = microtime(true);
for ($i=0; $i<$totalNumberOfRecords; $i++) {
$buffer->add(json_encode([
'id' => rand(0, 10000),
'title' => 'Foo'
]));
}
$buffer->flush();
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $totalNumberOfRecords;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
@harshini66
Copy link

Thanks for the sample code which works well..

@filipetomita
Copy link

Thanks for this sample code!

@mangelsnc
Copy link

Hi! I'm new in Kinesis and I think I am missing something.

I usually work with queues systems as RabbitMQ and others based on AMQP, theres any way in Kinesis to mark a record as “processed”? I mean, how can I remove a record from the stream?

I’m just testing using Kinesis as a kind of queue system (a client requirement for a project).
I created a consumer and a producer pretty similars to yours, but I want to put some code after the line 46 of your consumer which tells Kinesis that this record is processed and should not send it anymore (In AMQP this is usually an ACK or a NACK).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment