Skip to content

Instantly share code, notes, and snippets.

@vladaman
Created April 8, 2014 18:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vladaman/10171308 to your computer and use it in GitHub Desktop.
Save vladaman/10171308 to your computer and use it in GitHub Desktop.
PHP Script to split and merge Amazon Kinesis Streams
<?php
require 'aws-autoloader.php';
use Aws\Kinesis\KinesisClient;
// In real applications, the following code is part of your trusted code.
// It has your security credentials that you use to obtain temporary
// security credentials.
// Resource Policy to limit access to just some streams
/*
"Resource": [
"arn:aws:kinesis:us-east-1:*:stream/sink*"
]
*/
$AWSKey = 'AWSKey';
$AWSSecret = 'AwsSecret';
$StreamName = 'sink';
// Kinesis
$client = KinesisClient::factory(array(
'key' => $AWSKey,
'secret' => $AWSSecret,
'region' => 'us-east-1'
));
$result = $client->describeStream(array(
// StreamName is required
'StreamName' => $StreamName,
'Limit' => 20
));
$res = $result->get('StreamDescription');
print_r($result);
$ix = 0;
foreach ($res['Shards'] as $shard) {
if (isset($argv[1]) && $argv[1] == '/split') {
print_r($shard);
$newHashKeySplit = ($shard['HashKeyRange']['EndingHashKey'] - $shard['HashKeyRange']['StartingHashKey']) / 2;
$ln = readline(sprintf("Would you like to split %s shard (Y/n)?", $shard['ShardId']));
if ($ln == 'Y') {
splitShard($client, $StreamName, $shard['ShardId'], $newHashKeySplit);
}
}
if (isset($argv[1]) && $argv[1] == '/merge') {
printf("%d) %s Parent: %s", $ix, $shard['ShardId'], $shard['ParentShardId']);
if (isset($shard['AdjacentParentShardId'])) {
printf(" Adj: %s", $shard['AdjacentParentShardId']);
}
print("\n");
}
$ix++;
}
function splitShard($client, $streamName, $shard, $key)
{
$result = $client->splitShard(array(
// StreamName is required
'StreamName' => $streamName,
// ShardToSplit is required
'ShardToSplit' => $shard,
// NewStartingHashKey is required
'NewStartingHashKey' => sprintf("%0.0f", $key)
));
printf("Splitting Stream %s shard %s using key %s\n", $streamName, $shard, $key);
}
function mergeShards($client, $streamName, $shardToMerge, $shardAdjacent)
{
$result = $client->mergeShards(array(
// StreamName is required
'StreamName' => $streamName,
// ShardToMerge is required
'ShardToMerge' => $shardToMerge,
// AdjacentShardToMerge is required
'AdjacentShardToMerge' => $shardAdjacent
));
}
if (isset($argv[1]) && $argv[1] == '/merge') {
// Shards can be merged if their end and start sequence numbers continue
// mergeShards($client, $StreamName, "shardId-000000000005","shardId-000000000002");
}
// print_r($res);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment