Skip to content

Instantly share code, notes, and snippets.

@haje01
Last active December 17, 2023 03:52
Show Gist options
  • Save haje01/5abaeadda792b40f39d7 to your computer and use it in GitHub Desktop.
Save haje01/5abaeadda792b40f39d7 to your computer and use it in GitHub Desktop.

Kinesis로 App 로그 다루기

글쓴이: 김정주(haje01@gmail.com)

이 글은 AWS 블로그를 참고하고 추가/보완하여 작성되었습니다.


Kinesis는 아마존 웹서비스(AWS)에서 제공하는 실시간 데이터 처리기이다.

여기에서는 AWS Kinesis를 사용하여 App에서 로그를 수집하는 방법을 살펴본다.

특징

Kinesis는 다음과 같은 특징이 있다.

장점

  • 스트림만 생성하면 바로 데이터 수집이 가능
  • 스케일 인/아웃이 자유롭다.
  • 인증을 통한 네크워킹

제약/단점

  • 스트림 당 25샤드(Shard)까지 가질 수 있다(별도 계약으로 더 쓸 수 있음)
  • SDK를 사용할 수 없다면 HTTPS로 데이터를 보내야 함
  • 쓰기: 한 샤드에 대해 초당 1000레코드, 1MB크기
  • 읽기: 한 샤드에 대해 초당 5트랜잭션, 2MB크기

초당 쓰기 레코드 수를 늘리려면 샤드를 늘리거나(비용 증가), 뒤에서 설명할 모아(Collect) 보내기를 이용해야한다.

스케일링 및 HA

  • 샤드의 Split/Merge를 통해 구현. 유저가 폭증할 경우는 AWS측과 최대 샤드 수 증가 협의 필요
  • HA는 기본적으로 지원

Kinesis 이용 방법

다음과 같은 방법이 있다.

검토해본 결과, 일반적으로 사용하기에 SDK가 무난한 것 같다.

Kinesis 기초

준비

Kinesis는 다양한 툴과 랭귀지를 지원하지만, 여기에서는 aws cli 와 Python SDK(boto)을 이용해 진행하겠다. AWS 콘솔페이지에서 KinesisFullAccess 권한이 있는 IAM 계정을 만들고, aws cli 설치 후 다음과 같이 정보를 설정한다.

$ aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [None]: ap-northeast-1
Default output format [None]: json

스트림 생성하고 정보보기

다음과 같이 aws cli를 사용하여 새 스트림을 생성할 수 있다. (이름은 'KinesisTest', 샤드 수는 1로 한다.)

$ aws kinesis create-stream --stream-name=KinesisTest --shard-count=1

이제 KinesisTest스트림이 생성되었다.

스트림의 정보는 다음과 같이 볼 수있다.

 $ aws kinesis describe-stream --stream-name=KinesisTest
 
{
    "StreamDescription": {
        "RetentionPeriodHours": 24, 
        "StreamStatus": "ACTIVE", 
        "StreamName": "KinesisTest", 
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:415742736303:stream/KinesisTest", 
        "Shards": [
            {
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455", 
                    "StartingHashKey": "0"
                }, 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49558153526886807271208233148040194081103188177553719298"
                }
            }
        ]
    }
}

스트림의 상태(Status)가 ACTIVE면 데이터를 전송할 수 있다. 샤드가 하나 있고 Id는 shardId-000000000000인 것을 확인할 수 있다.

AWS콘솔 페이지에서도 다음과 같이 확인할 수 있다.

new stream

스트림으로 데이터 보내기(put)

생성된 스트림으로 데이터를 보내겠다. Kinesis에서는 데이터를 '레코드'라고 칭한다. 데이터의 단위라고 이해하면 되겠다.

다음과 같이 put.py파일을 생성한다. 여기에서 testdata모듈은 임의의 유저 데이터를 생성해주기 위한 용도이다.

import json

import testdata
from boto import kinesis

kinesis = kinesis.connect_to_region('ap-northeast-1')


# Kinesis에 넣을 가짜 유저데이터 생성기
class Users(testdata.DictFactory):
    firstname = testdata.FakeDataFactory('firstName')
    lastname = testdata.FakeDataFactory('lastName')
    age = testdata.RandomInteger(10, 30)
    gender = testdata.RandomSelection(['female', 'male'])


for user in Users().generate(50):
    print(user)
    # 스트림에 레코드를 하나씩 PUT
    kinesis.put_record('KinesisTest', json.dumps(user), 'partitionkey')

실행한다.

$ python put.py
{'lastname': 'Hansen', 'age': 19, 'firstname': 'Lou', 'gender': 'male'}
{'lastname': 'Hahn', 'age': 23, 'firstname': 'Justice', 'gender': 'female'}
{'lastname': 'Kozey', 'age': 29, 'firstname': 'Dalton', 'gender': 'male'}
{'lastname': 'Blick', 'age': 26, 'firstname': 'Missouri', 'gender': 'female'}
...

가상의 유저 데이터가 JSON 형태로 Kinesis 스트림에 보내지는 것을 확인할 수 있다.

put_record에서 사용되는 파티션키는 샤드가 여럿일 경우 샤드를 결정하는 데 사용된다. 현재는 샤드가 하나 밖에 없어서 파티션키를 지정해도 의미가 없다.

스트림에서 데이터 꺼내기(get)

스트림에 저장된 레코드는 유효 기간(기본 24시간)이 지나면 사라진다. 스트림에서 데이터를 얻어와 S3나 DynamoDB와 같은 영속적인 저장소에 저장해야 하는데, 이 과정은 사용자가 직접 구현하고, EC2 같은 서버에서 실행되어야 한다. 이런 프로그램을 Kinesis Consumer(또는 Stream) Application이라고 한다. (FireHose를 사용하면 필요가 없으나 아직 모든 리전에서 사용할 수 없다)

다음과 같이 get.py를 생성한다.

import json
from boto import kinesis
import time

kinesis = kinesis.connect_to_region('ap-northeast-1')
shard_it = kinesis.get_shard_iterator('KinesisTest', 'shardId-000000000000', 'LATEST')['ShardIterator']

while True:
    out = kinesis.get_records(shard_it, limit=2)
    print(out['Records'])
    shard_it = out['NextShardIterator']
    time.sleep(0.2)

실행하면 다음과 같이 빈 Records만 나온다.

$ python get.py 
[]
[]
[]
...

이것은 kinesis.get_shard_iteratorLATEST옵션이 가장 최근의 레코드를 표시하는 옵션이어서 그렇다. (이외에도 다양한 옵션이 있다.) get.py를 실행한 상태에서 새로운 터미널을 열고 put.py를 다시 실행해보면 데이터가 받아지는 것을 확인할 수 있다.

$ python get.py 
[]
[]
[{u'Data': u'{"lastname": "Heaney", "age": 11, "firstname": "Niko", "gender": "male"}', u'PartitionKey': u'partitionkey', u'ApproximateArrivalTimestamp': 1452576997.186, u'SequenceNumber': u'49558153526886807271208233148423423565921135851976261634'}]
...

샤드 이용하기

Kinesis의 강점은 실시간으로 샤드를 나누거나 합쳐서, 변화하는 데이터 양에 대응할 수 있다는 점이다.

샤드 분리(split)

데이터가 몰리면 부하를 분산하기 위해서 더 많은 샤드가 필요하다. 새로운 샤드는 기존의 샤드를 두 개로 분리하여 생성하게 된다. 이때 기존 샤드를 부모 샤드, 새로이 생성된 샤드를 자식 샤드라고 한다. 샤드 나누기도 aws cli를 통해서 할 수 있으나, 자식 샤드의 시작 해쉬값을 부모 샤드에 할당된 범위에서 계산해야 하기에 스크립트를 만들어서 하겠다.

다음과 같이 split.py를 생성한다.

from boto import kinesis

kinesis = kinesis.connect_to_region('ap-northeast-1')
sinfo = kinesis.describe_stream('KinesisTest')
hkey = int(sinfo['StreamDescription']['Shards'][0]['HashKeyRange']
           ['EndingHashKey'])

shard_id = 'shardId-000000000000'
kinesis.split_shard('KinesisTest', shard_id, str(hkey/2))

실행하면 첫 번째 샤드를 분리하게 된다.

$ python split.py

샤드 분리에는 시간이 좀 걸리는데, 이 동안에도 put.py를 실행해 데이터를 보낼 수 있다.

부모 샤드는 자식 샤드가 완전히 초기화 될 때까지 계속 레코드를 받을 수 있다. 자식 샤드의 초기화가 끝난 후 새로운 레코드는 자식 샤드로 들어간다. 부모 샤드의 기존 레코드는 유효 기간이 남아있는 한 읽기가 가능하며, 자기가 가진 모든 레코드의 유효 기간이 끝날 때 부모 샤드는 사라지게 된다.

실재로 샤드 분리 명령을 내리고 바로 put.py를 실행해보면(먼저 아래의 '파티션 키 지정해서 보내기' 작업 필요), 1번 샤드로 데이터가 가다가, 잠시 후 2번, 3번 샤드로 가는 것을 확인할 수 있었다.

샤드 분리 명령 잠시 후에 다시 스트림 정보를 살펴보면, 다음과 같이 2번 샤드( shardId-000000000001)와 3번 샤드(shardId-000000000002)샤드가 생성된 것을 확인할 수 있다.

$ aws describe-stream --stream-name=KinesisTest

...
"Shards": [
    {
        "ShardId": "shardId-000000000000", 
        ...
    }, 
    {
        "ShardId": "shardId-000000000001", 
        ...
    }, 
    {
        "ShardId": "shardId-000000000002", 
        ...
    }
]
...

파티션 키 지정해서 보내기

이제 샤드가 2개가 되기에 레코드를 보낼 때 의미있는 파티션 키를 사용하겠다. 일단 유저의 성별을 파티션 정보로 사용하도록 한다. 기존의 put.py 파일에서 put_record 부분을 다음과 같이 고친다.

pinfo = kinesis.put_record('KinesisTest', json.dumps(user), str(hash(user['gender'])))
print(pinfo)

실행해보면,

$ python sput.py 
{'lastname': 'Breitenberg', 'age': 17, 'firstname': 'Nickolas', 'gender': 'male'}
{u'ShardId': u'shardId-000000000002', u'SequenceNumber': u'49558150195735293985894932004291066377443943335107493922'}
{'lastname': "O'Keefe", 'age': 24, 'firstname': 'Joseph', 'gender': 'male'}
{u'ShardId': u'shardId-000000000002', u'SequenceNumber': u'49558150195735293985894932004292275303263557964282200098'}
{'lastname': 'Haley', 'age': 23, 'firstname': 'Blanca', 'gender': 'female'}
{u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49558150195712993240696401381151948510810524231950925842'}

유저의 성별에 따라서 다른 샤드로 보내지는 것을 알 수 있다.

샤드 병합(merge)

비용 절감을 위해서 데이터의 양이 줄어들면 샤드를 같이 줄이는 것이 필요하다. 다음의 명령으로 2, 3번 째 샤드를 합친다.

$ aws kinesis merge-shards --stream-name=KinesisTest --shard-to-merge=shardId-000000000001 --adjacent-shard-to-merge=shardId-000000000002

다시 스트림의 상태를 보면 다음과 같이 나온다.

$ aws kinesis describe-stream --stream-name=KinesisTest

        ...
        "Shards": [
            {
                "ShardId": "shardId-000000000000", 
                ...
            }, 
            {
                "ShardId": "shardId-000000000001",
                ...
            }, 
            {
                "ShardId": "shardId-000000000002", 
                ...
            }, 
            {
                "ShardId": "shardId-000000000003", 
                ...
            }
        ]
    }
}

2, 3번 샤드가 합쳐진 4번 샤드(shardId-000000000003)가 새로 생성된 것을 알 수 있다.

Kinesis 고급

여기에서는 Kinesis를 제대로 활용하기 위한 테크닉을 살펴보겠다.

파티션키 개선하기

실무에서 샤드 확장은 높은 확률로 발생하기에, 처음부터 파티션키를 샤드의 갯수보다 넉넉히 주는 것이 좋다. (Kinesis는 MD5 Hash함수를 사용해 파티션 키를 128bit 정수값으로 변환하여 샤드 지정에 사용한다)

예제에서 사용한 유저 데이터의 경우 age 항목이 적절한 파티션키가 될 것이다.

한 번에 더 많은 레코드를 보내기

한 번에 하나씩 레코드를 보내는 것 보다, 다수의 레코드를 모아 보내면 퍼포먼스도 좋고 비용도 절감할 수 있다. AWS에서는 이것을 Collection이라고 한다. (비슷한 Batching개념으로 Aggregation이 있으나, 이는 KPL을 통해서 사용해야 할 것이다) put_records를 사용하여 다음과 같이 put.py를 개선할 수 있다.

import json

import testdata
from boto import kinesis

kinesis = kinesis.connect_to_region('ap-northeast-1')


class Users(testdata.DictFactory):
    firstname = testdata.FakeDataFactory('firstName')
    lastname = testdata.FakeDataFactory('lastName')
    age = testdata.RandomInteger(10, 30)
    gender = testdata.RandomSelection(['female', 'male'])


for i in range(5):
    records = []
    for user in Users().generate(10):
        record = {'Data': json.dumps(user), 'PartitionKey':
                  str(hash(user['age']))}
        records.append(record)
    pinfo = kinesis.put_records(records, 'KinesisTest')
    print pinfo

이렇게 하면 한 번 접속에 많은 레코드를 보내기에, 빠른 속도로 전송된다.

KCL(Kinesis Client Library)로 데이터 꺼내기

기존 샤드를 분리하면 자식 샤드가 생기는데, 데이터의 순서가 중요하다면 데이터를 꺼낼 때 다음과 같이 해야한다.

  1. 부모 샤드에 처리되지 않고 남아있는 레코드를 먼저 처리해 준다.
  2. 부모 샤드 처리 후 자식 샤드에 대해 처리한다.

또, 샤드 분리와 병합이 잦은 경우, 샤드Id로 이터레이터를 얻어와 처리하는 과정이 좀 번거롭다.

이런 이유들로 서비스에 적용할 때는 SDK가 아닌 KCL(Kinesis Client Library)을 사용하면 편리할 것이다. (여기에서 Client는 Client/Server의 그것과 다름에 주의하자) KCL은 다음과 같은 특징이 있다.

  • 복수의 워커를 띄워 병렬 처리 (샤드에 변경이 있는 경우 그 상태를 DynamoDB에 저장하고 워커들과 공유한다)
  • 부모 샤드의 레코드를 처리 후 자식 샤드로 넘어가 순서를 보장한다.
  • KCL은 기본적으로 Java 라이브러리이고, 다른 언어(Python, .NET, Ruby 등)를 위한 랩퍼를 제공한다. 따라서 Java가 설치되어 있어야 한다.

(KCL을 사용하는 Consumer Application을 KCL Application이라고도 한다)

파이썬용 KCL 을 이용면 다음과 같은 형식의 코드가 된다.

from amazon_kclpy import kcl
import json, base64

class RecordProcessor(kcl.RecordProcessorBase):

    def initialize(self, shard_id):
        pass

    def process_records(self, records, checkpointer):
        pass

    def shutdown(self, checkpointer, reason):
        pass

if __name__ == "__main__":
    kclprocess = kcl.KCLProcess(RecordProcessor())
    kclprocess.run()

여기에서 process_records함수를 구현하여, S3에 저장하는 등의 작업을 처리한다.

마무리

시스템 구성도

Kinesis를 사용하는 시스템은 일반적으로 다음과 같이 구성될 것이다. (DynamoDB부분은 KCL에서 자동으로 사용)

deploy

보안 문제

App에서 Kinesis 스트림으로 데이터를 보내기 위해서는 AWS Credential 정보가 필요하다. PC나 모바일 디바이스의 경우, 역 엔지니어링을 통해 App에 내장된 Credential이 누출되어 어뷰징될 가능성이 있기에 Cognito를 사용하여 임시 계정을 발급받아 사용하는 것이 바람직하다. AWS에서 제공하는 모바일 SDK들은 이런 방식을 채택하고 있다.

모니터링

실재 서비스에서는 AWS 콘솔페이지에서 Kinesis 스트림의 상태를 모니터링 하면서 필요에 따라 샤딩을 이용하자.

kinesis monitoring

스트림 삭제

필요없게된 스트림은 삭제한다.

$ aws kinesis delete-stream --stream-name=KinesisTest

참조

http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html http://www.slideshare.net/zenos2408/aws-tajo?qid=426c3f19-d837-4619-823f-542306e60a31&v=qf1&b=&from_search=1 http://aws.amazon.com/ko/cognito/ https://dzone.com/articles/amazon-kinesis-is-20x-cheaper-when-used-with-the-k http://boto.cloudhackers.com/en/latest/ref/kinesis.html http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-after-resharding.html


감사합니다.

@channy
Copy link

channy commented Jan 14, 2016

Kinesis shard의 split 및 merge를 auto scaling과 유사한 방식으로 하려면, https://github.com/awslabs/amazon-kinesis-scaling-utils 를 사용하시면 됩니다.

@riceluxs1t
Copy link

python 의 KCL 버전이 있다는건 몰랐네요 감사합니다. 또한 @channy 해당 부분도 몰랐네요 감사합니다.

@haje01
Copy link
Author

haje01 commented Jan 15, 2016

@channy 알려주신 유틸리티도 살펴보겠습니다. 감사합니다.

@ppyong
Copy link

ppyong commented Apr 8, 2020

안녕하세요 꽤 오래전 글인데 ㅜㅜ 궁금한게 있어서 댓글로 남겨요. 혹시 답변 주실까 희망을 가지고 남깁니다.

파티션키라는거의 의미를 잘 모르겠는데요. 어떤 샤드로 데이터를 보내는지 결정하는 부분 같은데요.

예로 제가 5가지 타입이 있을 경우 이 타입 값을 파티션으로 하여 데이터를 전송할 경우

다섯개의 샤드에 퍼지게 되는건가요? 이렇게 퍼지는게 무슨의미가 있나해서요..

빼올때 특정 샤드만 지정해서 빼오면 제가 원하는 타입의 데이터만 가져올 수 있는 효과인가요?

그렇다면 제가 원하는 타입이 어떤 샤드에 있는지는 어떻게 알 수 있을까요..?

ㅜㅜ 질문이 난해했다면 죄송합니다

@nephtyws
Copy link

위 질문에 지나가다가 답 남겨봅니다.

Kinesis에서 한 shard가 처리할 수 있는 량은 1 MiB / 1000 records per second 입니다. 즉, 초당 1000+ 개 이상의 record가 들어올 때를 대비해서 shard를 여러 개를 쓰는 거고, 이때 어떤 샤드로 데이터를 보낼지를 지정하기 위해서 partition key라는 개념을 씁니다. Kinesis는 partition key를 만들 때 사용자가 지정해준 값에 MD5 연산을 해서 128bit signed integer로 만듭니다. 즉, 적절한 partition key를 지정해주어야 각 shard 간에 데이터를 고르게 뿌려줄 수 있습니다. 예를 들어, 사용자 ID나 이름을 partition key로 하는 경우에는 해당 사용자가 요청을 많이 보내면 특정 shard로 요청이 몰리기 때문에 sharding을 하는 의미가 조금 희석됩니다. 그러나 unique한 값으로 partition key를 해주는 경우에는 shard 전반에 데이터를 뿌려줄 수 있어서 올바른 sharding 예라고 할 수 있습니다.

그리고 Kinesis를 사용할 때 특정 shard에 내가 원하는 값만 넣어주는 것은 불가능합니다. 사실 불가능한 건 아니고 ExplicitHashKey 라는 parameter로 특정 데이터에 대한 shard key를 정해줄 수 있는데 (각 shard는 고유의 start ~ end key range를 갖고 있음) 이걸 사용하게 되면 Kinesis가 해주는 sharding stratergy의 이점을 볼 수 없어서 사실상 무의미하고, 데이터 타입별로 데이터를 저장할 때 가장 좋은 방법은 stream을 나누는 것입니다. 이건 Kinesis와 비슷한 concept을 가지고 있는 Kafka에서도 유효합니다. (Topic -> Stream, Partition -> Shard 라고 했을 때, Kafka에서도 특정 partition 안에 원하는 타입의 데이터만 넣을 수 없음. topic을 분리하는 게 좋음)

@ppyong
Copy link

ppyong commented May 3, 2020

이제서야 달아주신 댓글을 봤네요. 늦었지만 nephtyws 님 정말 감사합니다 ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment