Skip to content

Instantly share code, notes, and snippets.

@dcorrea777
Created March 16, 2022 18:25
Show Gist options
  • Save dcorrea777/e4e520a4bc71279de4ad72b098ce81d3 to your computer and use it in GitHub Desktop.
Save dcorrea777/e4e520a4bc71279de4ad72b098ce81d3 to your computer and use it in GitHub Desktop.
Dynamodb Stream example php
<?php
declare(strict_types=1);
use Aws\DynamoDb\Exception\DynamoDbException;
use Aws\DynamoDbStreams\DynamoDbStreamsClient;
$container = require_once __DIR__ . '/bootstrap.php';
$config = $container->get('dynamodb');
$d = new DynamoDbStreamsClient($config);
try {
$lastEvaluatedShardId = null;
$arn = 'arn:aws:dynamodb:ddblocal:000000000000:table/notify/stream/2022-03-14T17:09:30.224';
do {
$e = ['StreamArn' => $arn];
if ($lastEvaluatedShardId !== null) {
$e['ExclusiveStartShardId'] = $lastEvaluatedShardId;
}
$describe = $d->describeStream($e);
$shards = $describe['StreamDescription']['Shards'];
foreach ($shards as $shard) {
echo "shard id: {$shard['ShardId']}", PHP_EOL;
$iterator = $d->getShardIterator([
'ShardId' => $shard['ShardId'],
'ShardIteratorType' => 'LATEST',
'StreamArn' => $arn,
])['ShardIterator'];
echo "interator id: {$iterator}", PHP_EOL;
while($iterator !== null) {
sleep(1);
$records = $d->getRecords(['ShardIterator' => $iterator]);
echo "record: " . json_encode($records['Records']), PHP_EOL;
$iterator = $records['NextShardIterator'];
}
}
$lastEvaluatedShardId = $describe['StreamDescription']['LastEvaluatedShardId'];
} while (!$lastEvaluatedShardId);
} catch (DynamoDbException $e) {
echo 'Unable to create table', PHP_EOL;
echo $e->getMessage(), PHP_EOL;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment