Created
March 24, 2017 10:16
-
-
Save Ron3/4caac0c50d54a3c8743a5799b8242e49 to your computer and use it in GitHub Desktop.
AWS Kinesis Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2017年3月20日 by Ron | |
Email: luodm03@qq.com(如有错误, 欢迎指证.如有问题,欢迎讨论) | |
配套代码的git地址: | |
git: git@github.com:Ron3/AWS-Kinesis.git | |
https: https://github.com/Ron3/AWS-Kinesis.git | |
一直以来,我都对Amazon Kinesis Stream(以下称Kinesis)充满着好奇,一直在慢慢啃文档,但相对来说,还是不够仔细的.那既然看了 | |
就写个我对Kinesis的理解与总结吧.我第一次接触这个概念,应该是在第一次参加AWS会议的时候,卓动--张穗文分享了他们如何利用Kinesis做用户 | |
行为分析的.然而,我也正好曾经做过一个类似的,但是我们没有借助Kinesis来做,是写把LOG写成文件,然后采用Rsync同步到另外一台机器,在写入MySQL. | |
由于代码的健壮性写的不太好,总是有一些小问题,后来我也是借助其他类似Kinesis的组件重构掉的. | |
对于没有接触过的同学来说,可能比较好奇这个是什么东西,到底应该怎么样用.那我说说我的场景吧.游戏玩家,在操作的时候,是要统计一些数据来进行 | |
分析的,来改善游戏不足的地方.例如,玩家登录,登出,付费,进入关卡,挑战关卡成功,或者挑战失败等等,这些都是写入到Kinesis,然后在写个应用程序, | |
把数据读取出来,汇总,写入MySQL(如果数据量庞大的用户,可以看看aws的RedShift.这个东西,稍后我应该会去看看).接着在写个Web统计后台,把数据查询 | |
显示出来.而通常用户做一个操作.到我查询后台能看到这些,一般2S内即可. | |
那我们还是从现实具象化去描述Kinesis.这个东西有点像水管,一断不停的写入数据,另外一端不停的读取数据(这样描述不精确的,只是让大家在脑海中有个模型) | |
具体的可以看下图(以下的资料,大部分来自官方文档) | |
图片地址: https://docs.aws.amazon.com/zh_cn/streams/latest/dev/key-concepts.html | |
Streams 高级别架构 | |
Kinesis是一条巨大的"水"管,而数据流入到"水管"中后,他需要一个叫Shard(分片)的东西来承载.Shard才是真正的数据载体. | |
那这Kinesis和Shard之间到底是一怎么样的关系,在我脑海中,就是有点像,一条巨大的电缆里面,还会分为很多条小的电缆,而真正 | |
的数据传输,就是在小的电缆上进行的(但这样描述,应该是不够精确的,因为Shard他其实并非是像一条水管那样,他更加像一个TCP包, | |
到了网络中传输样子.可能很多人脑海中TCP就是一条"水管",但其实不是的). | |
1, 它的一些限制 | |
A), Kinesis它的数据保存默认是24小时,24小时他会删掉(并非是你读取一个,他就删除一个).当然,你可以在后台设置这个保存时间, | |
保留时间越长,需要的给的钱越多.最高可以有168小时的保存(7天) | |
B), Shard是流中数据记录的唯一标识组。一个流由一个或多个Shard组成,每个分片提供一个固定的容量单位。 | |
每个分片均可支持最多每秒5次读取和最多每秒2MB的读取速度. 每秒1000条记录可用于写入和最多每秒1MB的写入速度. | |
这里就有必要说一下了,官方文档是推荐你,首先计算出自己的业务,大概每一秒,会有多少的数据量,然后根据你数据量,来评估 | |
在创建流的时候,应该建立多少个Shard(至少要1个,即支持每秒5次,2MB/s读取, 1000条记录+1MB/s的写入速度) | |
具体计算方式,请参考: | |
https://docs.aws.amazon.com/zh_cn/streams/latest/dev/amazon-kinesis-streams.html | |
C), 每条数据最大应该是1M. | |
D), 支持base64编码 | |
官方总结的限制地址 | |
https://docs.aws.amazon.com/zh_cn/streams/latest/dev/service-sizes-and-limits.html | |
2, 分区键 | |
他是指定将这个数据写入到那个Shard中.然后我们看一下写入流的数据格式 | |
{ | |
"Data":blob, | |
"ExplicitHashKey": "string", | |
"PartitionKey": "string", | |
"SequenceNumberForOrdering": "string", | |
"StreamName": "string" | |
} | |
Data: 即你写入的到流中的数据 | |
ExplicitHashKey: 明确的哈希值.可以不用填写) | |
PartitionKey: 分区键(一定要填写) | |
SequenceNumberForOrdering: 这个值非必要填,他是在同一客户端,要写入同一个流时,为了确保你的数据是顺序的,那就要填写这个值(递增的值) | |
如果你不填写,则按照数据到达他Kinesis的顺序来排列. | |
StreamName: 流的名字(一定要填写) | |
当我看到这个API的时候,我懵逼在于,ExplicitHashKey, PartitionKey这个2值.官方是这么解释的这2个值的 | |
ExplicitHashKey | |
The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. | |
Type: String | |
Pattern: 0|([1-9]\d{0,38}) | |
Required: No | |
这个明确的Hash值决定这个数据写入到那个Shard | |
PartitionKey | |
Determines which shard in the stream the data record is assigned to. Partition keys are Unicode strings with a maximum length limit of 256 characters for each key. Amazon Kinesis uses the partition key as input to a hash function that maps the partition key and associated data to a specific shard. Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream. | |
Type: String | |
Length Constraints: Minimum length of 1. Maximum length of 256. | |
Required: Yes | |
将这个数据关联到那个Shard上.他是一个最大为256位的字符串.然后采用MD5算法去对这个PartitionKey做运算,得到一个128位的整数.然后就知道关联到那个Shard上了 | |
而我对这2个值就懵逼了.一个说是明确的Hash值,你给了他,Kinesis就知道写入那个Shard(那我怎么知道这个Shard的关联值是多少啊) | |
一个说是对一个字符串做md5运算,然后得到要一个128位的整数,然后就知道他写入那个Shard. | |
后来,当我看到这个的时候,我就一切都明白了(以下的这些信息.都将会在配套的代码里面能有示例) | |
{ | |
"ShardId": "shardId-000000000000", | |
"HashKeyRange": { | |
"EndingHashKey": "340282366920938463463374607431768211455", | |
"StartingHashKey": "0" | |
}, | |
"SequenceNumberRange": { | |
"EndingSequenceNumber": "49569378587012170335082352128586286997053510910950244354", | |
"StartingSequenceNumber": "49569378587001019962483086817016728063736159450739769346" | |
} | |
} | |
} | |
看到了HashKeyRange了吗? 每一个Shard都有一个从开始到结束的整数范围.如果你有2个Shard的时候, | |
他是这样子的(其实中间有数据,我掉了,这里只为了阐述清楚是你的数据怎么关联到指定Shard的) | |
[{ | |
"ShardId": "shardId-000000000001", | |
"HashKeyRange": { | |
"EndingHashKey": "170141183460469231731687303715884105728", | |
"StartingHashKey": "0" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000002", | |
"HashKeyRange": { | |
"EndingHashKey": "340282366920938463463374607431768211455", | |
"StartingHashKey": "170141183460469231731687303715884105729" | |
} | |
}] | |
当你只有一个Shard的时候,他的是从0 至 340282366920938463463374607431768211455 | |
当你2个Shard的时候,Shard1是从 0 至今 170141183460469231731687303715884105728 | |
Shard2是从170141183460469231731687303715884105729 至 340282366920938463463374607431768211455 | |
换句来说,当有2个Shard的时候,如果你指定了的ExplicitHashKey 在 0 小于 170141183460469231731687303715884105728他都命中Shard1 | |
在大于170141183460469231731687303715884105728的时候,都关联到Shard2中. | |
如果是你不指定这值.就相当于采用md5(PartitionKey), 看他命中Shard1, 还是Shard2.(相当于全随机到任意的Shard了) | |
如果Shard数量越多,那这个HashKeyRange,将会是越分越多段.如果要全随机的朋友,你直接对你的数据做个hash运算,然后得到一个 | |
hash值,把这个hash值填进去PartitionKey即可. | |
3, 如何将数据写入到流(具体可以看代码,如果用boto3来做,那就是几行代码的事情) | |
A), 首先去IAM里面申请一个USER,然后拿到Key和Secret.然后在本机的~/.aws/credentials(MAC或者Linux). | |
访问类型 : 编程访问 | |
权限给 AmazonKinesisFullAccess | |
接着得到访问ID和私有访问秘钥(就是我上面提到的Key和Secret) | |
然后把这个配置到去 ~/.aws/credentials | |
[default] | |
aws_access_key_id = xxxxx | |
aws_secret_access_key = xxxx | |
配置第2个文件 ~/.aws/config | |
[default] | |
region=ap-southeast-1 | |
这里表示默认区域是新加坡的意思 | |
B),这里可以分为单个和批量. | |
C),读取 | |
D),这里有必要说一下.如果写入的数据量在瞬间超过1M(这个测试用例1用了1个Shard.所以他最大写入速度是1MB/S) | |
<class 'botocore.exceptions.ClientError'>, (u'An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000000 in stream MyStream under account xxxxxx.',) | |
如果说,发生这个异常.一般根据业务逻辑,是需要重新写进去的时候,但是Kinesis是保证FIFO的.如果出了异常. | |
导致这个没能按照顺序写入(被其后面产生的Log先写进去了) | |
那恐怕你在处理业务逻辑的时候,就要小心了 | |
或者,你的客户端在写入的时候,如果前面那个没有返回成功,那后面的你先不要写.如果他异常,一会在一起批量写进去 | |
如果你要求性能很高,那我目前没想到太好的办法,你尽量对你的高峰写入值估大点.不要让它抛出异常 | |
或者你自己在处理的时候,自己给每一条数据一个递增的LogNum.那存入数据库后,也可以考这个LogNum来重新排序. | |
那这个时候.你或许会问,我是有多个客户端同时写入的啊.那我怎么样保证我的LogNum都不同,那这里按照我们以前经验,还是挺简单处理的 | |
给每一个客户端一个clientId, 例如, 1001, 1002.那1001客户端的LogNum都是这样的 | |
1001001, 1011001, 1021001(未4位都是clientId) | |
示例代码在commom.py的createLogNum | |
4, 关于读取. | |
读取,首先他需要一个ShardIterator.而关于这个ShardIterator是指你需要从哪里开始读取Kinesis中的数据. | |
他可以通过boto3里的API get_shard_iterator 获取回来.这里需要注意的是,他有几种方式来得到开始读取数据的位置 | |
A), AT_SEQUENCE_NUMBER 给一个系列号他,然后从这个系列号开始读(包括这个系列号) | |
B), AFTER_SEQUENCE_NUMBER 从这个系列号后面开始读 | |
C), TRIM_HORIZON 从系统中第一个未处理的读起 | |
D), LATEST 从最新的开始读起 | |
E), AT_TIMESTAMP 从指定的时间读起 | |
F), 如果ShardIterator. 超时.将会返回类似的异常提示 | |
botocore.exceptions.ClientError: An error occurred (ExpiredIteratorException) when calling the GetRecords operation: Iterator expired. The iterator was created at time Thu Mar 23 03:13:20 UTC 2017 while right now it is Thu Mar 23 03:20:16 UTC 2017 which is further in the future than the tolerated delay of 300000 milliseconds. | |
G), 我在测试的时候,发现他其实请求一次,能拉取7M数据的.官方文档说一次性最大能给回你10M数据.但是你接下来5S内如果去再次读取.则会发生异常 | |
H), 如果你从已经删掉(超过24小时删除)的系列号开始读,其实也没有问题.只是删除部分的数据你读取不了 | |
I), 虽然说每个Shard是限制每秒2MB/s读取速率,但是如果你调用GetRecords的时候,一次性最大能返回10M的数据 | |
J), 如果你一次调用返回10m数据,你在5秒内,再次调用GetRecords,则出出现ProvisionedThroughputExceededException异常 | |
K), MillisBehindLatest, 这个值表示你距离最新的Record,还有多久(时间意义上的),如果是0,则表示你已经读到最流的最前面了 | |
5, 关于改版Shard数量的 | |
这里需要注意每一个shard的SequenceNumberRange. | |
最开始假设是这样的 | |
{ | |
"Shards": [ | |
{ | |
"ShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586" | |
} | |
} | |
], | |
"StreamStatus": "ACTIVE" | |
} | |
如果从1个改成2个,他是这样的 | |
{ | |
"Shards": [ | |
{ | |
"ShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"EndingSequenceNumber": "49571577983913304452318332263350352993826417086331617282", | |
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000001", | |
"ParentShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"StartingSequenceNumber": "49571630929104863445205345313098652477253935219347554322" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000002", | |
"ParentShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"StartingSequenceNumber": "49571630929127164190403875936240188195526583580853534754" | |
} | |
} | |
], | |
"StreamStatus": "ACTIVE" | |
} | |
接着在改回来1个Shard | |
{ | |
"Shards": [ | |
{ | |
"ShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"EndingSequenceNumber": "49571577983913304452318332263350352993826417086331617282", | |
"StartingSequenceNumber": "49571568732482609959687117931971140088870254941080387586" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000001", | |
"ParentShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"EndingSequenceNumber": "49571630929116013817804610624668211410570660301527580690", | |
"StartingSequenceNumber": "49571630929104863445205345313098652477253935219347554322" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000002", | |
"ParentShardId": "shardId-000000000000", | |
"SequenceNumberRange": { | |
"EndingSequenceNumber": "49571630929138314563003141247809747128843308663033561122", | |
"StartingSequenceNumber": "49571630929127164190403875936240188195526583580853534754" | |
} | |
}, | |
{ | |
"ShardId": "shardId-000000000003", | |
"ParentShardId": "shardId-000000000001", | |
"AdjacentParentShardId": "shardId-000000000002", | |
"SequenceNumberRange": { | |
"StartingSequenceNumber": "49571631010145771496665629809439452680058096531401605170" | |
} | |
} | |
], | |
} | |
6, 参考文档 | |
API文档 | |
http://docs.aws.amazon.com/zh_cn/kinesis/latest/APIReference/Welcome.html | |
boto3文档 | |
http://boto3.readthedocs.io/en/latest/reference/services/kinesis.html | |
7, 总结 | |
A), 写入的时候,如果出现异常.则需要重新尝试写入. | |
B), 如果有必要写入同一个Shard的,请理解SequenceNumberRange和使用参数ExplicitHashKey控制 | |
我对这个的场景能想到的是,如果一条数据超过1M,要2条以上才能全部写入,那就应该使用这种 | |
C), 读取的时候,官方文档介绍是每一条线程不停的轮询,然后去读取一个Shard. | |
D), 如果你一个Shard的数据全部读取完毕,他NextShardIterator返回null | |
E), 控制好读取频率.如果造成异常,请处理.千万别导致当前线程退出 | |
F), 如果你有多个Shard.官方推荐每条线程轮询读取一个Shard | |
G), 如果你发现这个Shard是含有 EndingSequenceNumber字段的,那应该是旧的. | |
H), | |
8, 一些疑问 | |
1, Kinesis虽然看起来像是一条"水管", 但其实他数据必然应该要有一个很强大的DB来存储,这用的是什么存储? | |
2, 关于Kinesis的sequenceNumber.我对这个值比较好奇,如果是我来做类似Kinesis这样的东西 | |
每个新创建的Stream的都从0开始就行了.但现在从真实情况来看,应该是全局的,即所有的流都共用一个递增的sequenceNumber. | |
这样做的目的是为了存储的时候,不需要为每条Stream都开一张DB表(我的猜想是可能N个不同用户之间的Stream都共用一个存储表)? | |
我猜想是这个sequenceNumber是某张表的主Key. | |
3, 关于读取的时候.需要先得到要一个ShardIterator的迭代器.但其实本质上来说,就是给一个sequenceNumber和一个ShardId. | |
那我读取完后.我又得知最后一条数据的sequenceNumber, 那我在用sequenceNumber + shardId继续读取后面的不就行了么? | |
4, 读的时候还涉及到一个逻辑问题.采用describe_stream的时候,他总是会给回这个Stream从创建到现在的SequenceNumberRange, | |
即使我去读取已经删除的数据,Kinesis也是会返回一个空的List.文档说,并且多次调用后,才会读取到有数据的地方.那即是保存了 | |
一张关于这个Stream的sequenceNumber的表(针对存储数据的那张表来说,这张就像是存的是索引信息了)? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment