Skip to content

Instantly share code, notes, and snippets.

@Ron3
Created March 24, 2017 10:16
Show Gist options
  • Save Ron3/4caac0c50d54a3c8743a5799b8242e49 to your computer and use it in GitHub Desktop.
Save Ron3/4caac0c50d54a3c8743a5799b8242e49 to your computer and use it in GitHub Desktop.
AWS Kinesis Stream
2017年3月20日 by Ron
Email: luodm03@qq.com(如有错误, 欢迎指证.如有问题,欢迎讨论)
配套代码的git地址:
git: git@github.com:Ron3/AWS-Kinesis.git
https: https://github.com/Ron3/AWS-Kinesis.git
一直以来,我都对Amazon Kinesis Stream(以下称Kinesis)充满着好奇,一直在慢慢啃文档,但相对来说,还是不够仔细的.那既然看了
就写个我对Kinesis的理解与总结吧.我第一次接触这个概念,应该是在第一次参加AWS会议的时候,卓动--张穗文分享了他们如何利用Kinesis做用户
行为分析的.然而,我也正好曾经做过一个类似的,但是我们没有借助Kinesis来做,是写把LOG写成文件,然后采用Rsync同步到另外一台机器,在写入MySQL.
由于代码的健壮性写的不太好,总是有一些小问题,后来我也是借助其他类似Kinesis的组件重构掉的.
对于没有接触过的同学来说,可能比较好奇这个是什么东西,到底应该怎么样用.那我说说我的场景吧.游戏玩家,在操作的时候,是要统计一些数据来进行
分析的,来改善游戏不足的地方.例如,玩家登录,登出,付费,进入关卡,挑战关卡成功,或者挑战失败等等,这些都是写入到Kinesis,然后在写个应用程序,
把数据读取出来,汇总,写入MySQL(如果数据量庞大的用户,可以看看aws的RedShift.这个东西,稍后我应该会去看看).接着在写个Web统计后台,把数据查询
显示出来.而通常用户做一个操作.到我查询后台能看到这些,一般2S内即可.
那我们还是从现实具象化去描述Kinesis.这个东西有点像水管,一断不停的写入数据,另外一端不停的读取数据(这样描述不精确的,只是让大家在脑海中有个模型)
具体的可以看下图(以下的资料,大部分来自官方文档)
图片地址: https://docs.aws.amazon.com/zh_cn/streams/latest/dev/key-concepts.html
Streams 高级别架构
Kinesis是一条巨大的"水"管,而数据流入到"水管"中后,他需要一个叫Shard(分片)的东西来承载.Shard才是真正的数据载体.
那这Kinesis和Shard之间到底是一怎么样的关系,在我脑海中,就是有点像,一条巨大的电缆里面,还会分为很多条小的电缆,而真正
的数据传输,就是在小的电缆上进行的(但这样描述,应该是不够精确的,因为Shard他其实并非是像一条水管那样,他更加像一个TCP包,
到了网络中传输样子.可能很多人脑海中TCP就是一条"水管",但其实不是的).
1, 它的一些限制
A), Kinesis它的数据保存默认是24小时,24小时他会删掉(并非是你读取一个,他就删除一个).当然,你可以在后台设置这个保存时间,
保留时间越长,需要的给的钱越多.最高可以有168小时的保存(7天)
B), Shard是流中数据记录的唯一标识组。一个流由一个或多个Shard组成,每个分片提供一个固定的容量单位。
每个分片均可支持最多每秒5次读取和最多每秒2MB的读取速度. 每秒1000条记录可用于写入和最多每秒1MB的写入速度.
这里就有必要说一下了,官方文档是推荐你,首先计算出自己的业务,大概每一秒,会有多少的数据量,然后根据你数据量,来评估
在创建流的时候,应该建立多少个Shard(至少要1个,即支持每秒5次,2MB/s读取, 1000条记录+1MB/s的写入速度)
具体计算方式,请参考:
https://docs.aws.amazon.com/zh_cn/streams/latest/dev/amazon-kinesis-streams.html
C), 每条数据最大应该是1M.
D), 支持base64编码
官方总结的限制地址
https://docs.aws.amazon.com/zh_cn/streams/latest/dev/service-sizes-and-limits.html
2, 分区键
他是指定将这个数据写入到那个Shard中.然后我们看一下写入流的数据格式
{
"Data":blob,
"ExplicitHashKey": "string",
"PartitionKey": "string",
"SequenceNumberForOrdering": "string",
"StreamName": "string"
}
Data: 即你写入的到流中的数据
ExplicitHashKey: 明确的哈希值.可以不用填写)
PartitionKey: 分区键(一定要填写)
SequenceNumberForOrdering: 这个值非必要填,他是在同一客户端,要写入同一个流时,为了确保你的数据是顺序的,那就要填写这个值(递增的值)
如果你不填写,则按照数据到达他Kinesis的顺序来排列.
StreamName: 流的名字(一定要填写)
当我看到这个API的时候,我懵逼在于,ExplicitHashKey, PartitionKey这个2值.官方是这么解释的这2个值的
ExplicitHashKey
The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
Type: String
Pattern: 0|([1-9]\d{0,38})
Required: No
这个明确的Hash值决定这个数据写入到那个Shard
PartitionKey
Determines which shard in the stream the data record is assigned to. Partition keys are Unicode strings with a maximum length limit of 256 characters for each key. Amazon Kinesis uses the partition key as input to a hash function that maps the partition key and associated data to a specific shard. Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.
Type: String
Length Constraints: Minimum length of 1. Maximum length of 256.
Required: Yes
将这个数据关联到那个Shard上.他是一个最大为256位的字符串.然后采用MD5算法去对这个PartitionKey做运算,得到一个128位的整数.然后就知道关联到那个Shard上了
而我对这2个值就懵逼了.一个说是明确的Hash值,你给了他,Kinesis就知道写入那个Shard(那我怎么知道这个Shard的关联值是多少啊)
一个说是对一个字符串做md5运算,然后得到要一个128位的整数,然后就知道他写入那个Shard.
后来,当我看到这个的时候,我就一切都明白了(以下的这些信息.都将会在配套的代码里面能有示例)
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"EndingSequenceNumber": "49569378587012170335082352128586286997053510910950244354",
"StartingSequenceNumber": "49569378587001019962483086817016728063736159450739769346"
}
}
}
看到了HashKeyRange了吗? 每一个Shard都有一个从开始到结束的整数范围.如果你有2个Shard的时候,
他是这样子的(其实中间有数据,我掉了,这里只为了阐述清楚是你的数据怎么关联到指定Shard的)
[{
"ShardId": "shardId-000000000001",
"HashKeyRange": {
"EndingHashKey": "170141183460469231731687303715884105728",
"StartingHashKey": "0"
}
},
{
"ShardId": "shardId-000000000002",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "170141183460469231731687303715884105729"
}
}]
当你只有一个Shard的时候,他的是从0 至 340282366920938463463374607431768211455
当你2个Shard的时候,Shard1是从 0 至今 170141183460469231731687303715884105728
Shard2是从170141183460469231731687303715884105729 至 340282366920938463463374607431768211455
换句来说,当有2个Shard的时候,如果你指定了的ExplicitHashKey 在 0 小于 170141183460469231731687303715884105728他都命中Shard1
在大于170141183460469231731687303715884105728的时候,都关联到Shard2中.
如果是你不指定这值.就相当于采用md5(PartitionKey), 看他命中Shard1, 还是Shard2.(相当于全随机到任意的Shard了)
如果Shard数量越多,那这个HashKeyRange,将会是越分越多段.如果要全随机的朋友,你直接对你的数据做个hash运算,然后得到一个
hash值,把这个hash值填进去PartitionKey即可.
3, 如何将数据写入到流(具体可以看代码,如果用boto3来做,那就是几行代码的事情)
A), 首先去IAM里面申请一个USER,然后拿到Key和Secret.然后在本机的~/.aws/credentials(MAC或者Linux).
访问类型 : 编程访问
权限给 AmazonKinesisFullAccess
接着得到访问ID和私有访问秘钥(就是我上面提到的Key和Secret)
然后把这个配置到去 ~/.aws/credentials
[default]
aws_access_key_id = xxxxx
aws_secret_access_key = xxxx
配置第2个文件 ~/.aws/config
[default]
region=ap-southeast-1
这里表示默认区域是新加坡的意思
B),这里可以分为单个和批量.
C),读取
D),这里有必要说一下.如果写入的数据量在瞬间超过1M(这个测试用例1用了1个Shard.所以他最大写入速度是1MB/S)
<class 'botocore.exceptions.ClientError'>, (u'An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000000 in stream MyStream under account xxxxxx.',)
如果说,发生这个异常.一般根据业务逻辑,是需要重新写进去的时候,但是Kinesis是保证FIFO的.如果出了异常.
导致这个没能按照顺序写入(被其后面产生的Log先写进去了)
那恐怕你在处理业务逻辑的时候,就要小心了
或者,你的客户端在写入的时候,如果前面那个没有返回成功,那后面的你先不要写.如果他异常,一会在一起批量写进去
如果你要求性能很高,那我目前没想到太好的办法,你尽量对你的高峰写入值估大点.不要让它抛出异常
或者你自己在处理的时候,自己给每一条数据一个递增的LogNum.那存入数据库后,也可以考这个LogNum来重新排序.
那这个时候.你或许会问,我是有多个客户端同时写入的啊.那我怎么样保证我的LogNum都不同,那这里按照我们以前经验,还是挺简单处理的
给每一个客户端一个clientId, 例如, 1001, 1002.那1001客户端的LogNum都是这样的
1001001, 1011001, 1021001(未4位都是clientId)
示例代码在commom.py的createLogNum
4, 关于读取.
读取,首先他需要一个ShardIterator.而关于这个ShardIterator是指你需要从哪里开始读取Kinesis中的数据.
他可以通过boto3里的API get_shard_iterator 获取回来.这里需要注意的是,他有几种方式来得到开始读取数据的位置
A), AT_SEQUENCE_NUMBER 给一个系列号他,然后从这个系列号开始读(包括这个系列号)
B), AFTER_SEQUENCE_NUMBER 从这个系列号后面开始读
C), TRIM_HORIZON 从系统中第一个未处理的读起
D), LATEST 从最新的开始读起
E), AT_TIMESTAMP 从指定的时间读起
F), 如果ShardIterator. 超时.将会返回类似的异常提示
botocore.exceptions.ClientError: An error occurred (ExpiredIteratorException) when calling the GetRecords operation: Iterator expired. The iterator was created at time Thu Mar 23 03:13:20 UTC 2017 while right now it is Thu Mar 23 03:20:16 UTC 2017 which is further in the future than the tolerated delay of 300000 milliseconds.
G), 我在测试的时候,发现他其实请求一次,能拉取7M数据的.官方文档说一次性最大能给回你10M数据.但是你接下来5S内如果去再次读取.则会发生异常
H), 如果你从已经删掉(超过24小时删除)的系列号开始读,其实也没有问题.只是删除部分的数据你读取不了
I), 虽然说每个Shard是限制每秒2MB/s读取速率,但是如果你调用GetRecords的时候,一次性最大能返回10M的数据
J), 如果你一次调用返回10m数据,你在5秒内,再次调用GetRecords,则出出现ProvisionedThroughputExceededException异常
K), MillisBehindLatest, 这个值表示你距离最新的Record,还有多久(时间意义上的),如果是0,则表示你已经读到最流的最前面了
5, 关于改版Shard数量的
这里需要注意每一个shard的SequenceNumberRange.
最开始假设是这样的
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"SequenceNumberRange": {
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586"
}
}
],
"StreamStatus": "ACTIVE"
}
如果从1个改成2个,他是这样的
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"SequenceNumberRange": {
"EndingSequenceNumber": "49571577983913304452318332263350352993826417086331617282",
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"StartingSequenceNumber": "49571630929104863445205345313098652477253935219347554322"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"StartingSequenceNumber": "49571630929127164190403875936240188195526583580853534754"
}
}
],
"StreamStatus": "ACTIVE"
}
接着在改回来1个Shard
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"SequenceNumberRange": {
"EndingSequenceNumber": "49571577983913304452318332263350352993826417086331617282",
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"EndingSequenceNumber": "49571630929116013817804610624668211410570660301527580690",
"StartingSequenceNumber": "49571630929104863445205345313098652477253935219347554322"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"EndingSequenceNumber": "49571630929138314563003141247809747128843308663033561122",
"StartingSequenceNumber": "49571630929127164190403875936240188195526583580853534754"
}
},
{
"ShardId": "shardId-000000000003",
"ParentShardId": "shardId-000000000001",
"AdjacentParentShardId": "shardId-000000000002",
"SequenceNumberRange": {
"StartingSequenceNumber": "49571631010145771496665629809439452680058096531401605170"
}
}
],
}
6, 参考文档
API文档
http://docs.aws.amazon.com/zh_cn/kinesis/latest/APIReference/Welcome.html
boto3文档
http://boto3.readthedocs.io/en/latest/reference/services/kinesis.html
7, 总结
A), 写入的时候,如果出现异常.则需要重新尝试写入.
B), 如果有必要写入同一个Shard的,请理解SequenceNumberRange和使用参数ExplicitHashKey控制
我对这个的场景能想到的是,如果一条数据超过1M,要2条以上才能全部写入,那就应该使用这种
C), 读取的时候,官方文档介绍是每一条线程不停的轮询,然后去读取一个Shard.
D), 如果你一个Shard的数据全部读取完毕,他NextShardIterator返回null
E), 控制好读取频率.如果造成异常,请处理.千万别导致当前线程退出
F), 如果你有多个Shard.官方推荐每条线程轮询读取一个Shard
G), 如果你发现这个Shard是含有 EndingSequenceNumber字段的,那应该是旧的.
H),
8, 一些疑问
1, Kinesis虽然看起来像是一条"水管", 但其实他数据必然应该要有一个很强大的DB来存储,这用的是什么存储?
2, 关于Kinesis的sequenceNumber.我对这个值比较好奇,如果是我来做类似Kinesis这样的东西
每个新创建的Stream的都从0开始就行了.但现在从真实情况来看,应该是全局的,即所有的流都共用一个递增的sequenceNumber.
这样做的目的是为了存储的时候,不需要为每条Stream都开一张DB表(我的猜想是可能N个不同用户之间的Stream都共用一个存储表)?
我猜想是这个sequenceNumber是某张表的主Key.
3, 关于读取的时候.需要先得到要一个ShardIterator的迭代器.但其实本质上来说,就是给一个sequenceNumber和一个ShardId.
那我读取完后.我又得知最后一条数据的sequenceNumber, 那我在用sequenceNumber + shardId继续读取后面的不就行了么?
4, 读的时候还涉及到一个逻辑问题.采用describe_stream的时候,他总是会给回这个Stream从创建到现在的SequenceNumberRange,
即使我去读取已经删除的数据,Kinesis也是会返回一个空的List.文档说,并且多次调用后,才会读取到有数据的地方.那即是保存了
一张关于这个Stream的sequenceNumber的表(针对存储数据的那张表来说,这张就像是存的是索引信息了)?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment