Skip to content

Instantly share code, notes, and snippets.

@masayuki5160
Last active August 29, 2015 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masayuki5160/f72e089fb471708b9f8c to your computer and use it in GitHub Desktop.
Save masayuki5160/f72e089fb471708b9f8c to your computer and use it in GitHub Desktop.
Python + Kinesis
{"date": "2015-05-03 10:36:43", "result": "106.179.89.159 - - [03/May/2015:10:36:43 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"}
{"date": "2015-05-03 10:36:43", "result": "106.179.89.159 - - [03/May/2015:10:36:43 +0000] \"GET /icons/apache_pb2.gif HTTP/1.1\" 304 - \"http://52.68.97.208/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"}
{"date": "2015-05-03 10:36:44", "result": "106.179.89.159 - - [03/May/2015:10:36:44 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"}
{"date": "2015-05-03 10:36:44", "result": "106.179.89.159 - - [03/May/2015:10:36:44 +0000] \"GET /icons/apache_pb2.gif HTTP/1.1\" 304 - \"http://52.68.97.208/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"}
{"date": "2015-05-03 10:36:46", "result": "106.179.89.159 - - [03/May/2015:10:36:46 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"}
#!/usr/bin/python
# -*- coding: utf-8 -*-
import boto.kinesis,datetime,time
import re
import json
import urllib2
import os.path
import calendar
from urllib2 import Request, urlopen, URLError, HTTPError
from boto.s3.key import Key
# kinesisで加工したログの保存先S3(バケット名は任意)
s3 = boto.connect_s3()
bucket = s3.get_bucket("masayuki-emr")
k = Key(bucket)
dt = ""
# kinesisへの接続設定(regionとstream名を設定)
connection = boto.kinesis.connect_to_region('ap-northeast-1')
stream_name = 'masayuki'
stream = connection.describe_stream(stream_name)
shards = stream['StreamDescription']['Shards'][0]['ShardId']
kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST')
next_iterator = None
while True:
if next_iterator is None:
next_iterator = kinesis_iterator['ShardIterator']
else:
next_iterator = responce['NextShardIterator']
responce = None
responce = connection.get_records(next_iterator,limit=1)
if len(responce['Records'])!=0:
result = responce['Records'][0]['Data']
# 日付のフォーマット変換
resultArray = result.split(' [')
dateTime = resultArray[1].split(']')[0]
dateArray = dateTime.split('/')
timeArray = dateArray[2].split(':')
for i ,v in enumerate(calendar.month_abbr):
if dateArray[1]==v:
dateMonth = i
eDatetime = timeArray[0]+'-'+str('%02d' % dateMonth)+'-'+dateArray[0]+' '+timeArray[1]+':'+timeArray[2]+':'+re.match(r'(.*) ',timeArray[3]).group(1)
od = dt
d = datetime.datetime.today()
# 分単位でログファイルを生成
dt = '%s%s%s%s%s' % (d.year, str('%02d' % d.month), str('%02d' % d.day), str('%02d' % d.hour), str('%02d' % d.minute))
if od != dt:
logfile = 'access_log.' + od
if os.path.exists(logfile):
print logfile
k.key = logfile
k.set_contents_from_filename(logfile)
fi = open('access_log.' + dt, 'a')
# JSON形式に加工
tmpjson = json.dumps({'date':eDatetime, 'result':result})
fi.write(tmpjson + "\n")
# fi.write(eDatetime + "\t" + result + "\t" + "TEST\n")
fi.close()
# 1件づつ取得をしているため、Streamの帯域にあわせて制御
time.sleep(0.2)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import boto.kinesis,datetime,time
import re
import json
import urllib2
import os.path
import calendar
from urllib2 import Request, urlopen, URLError, HTTPError
from boto.s3.key import Key
# kinesisで加工したログの保存先S3(バケット名は任意)
s3 = boto.connect_s3()
bucket = s3.get_bucket("masayuki-emr")
k = Key(bucket)
dt = ""
# kinesisへの接続設定(regionとstream名を設定)
connection = boto.kinesis.connect_to_region('ap-northeast-1')
stream_name = 'masayuki'
stream = connection.describe_stream(stream_name)
shards = stream['StreamDescription']['Shards'][0]['ShardId']
kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST')
next_iterator = None
while True:
if next_iterator is None:
next_iterator = kinesis_iterator['ShardIterator']
else:
next_iterator = responce['NextShardIterator']
responce = None
responce = connection.get_records(next_iterator,limit=1)
if len(responce['Records'])!=0:
result = responce['Records'][0]['Data']
# 日付のフォーマット変換
resultArray = result.split(' [')
dateTime = resultArray[1].split(']')[0]
dateArray = dateTime.split('/')
timeArray = dateArray[2].split(':')
for i ,v in enumerate(calendar.month_abbr):
if dateArray[1]==v:
dateMonth = i
eDatetime = timeArray[0]+'-'+str('%02d' % dateMonth)+'-'+dateArray[0]+' '+timeArray[1]+':'+timeArray[2]+':'+re.match(r'(.*) ',timeArray[3]).group(1)
od = dt
d = datetime.datetime.today()
# 分単位でログファイルを生成
dt = '%s%s%s%s%s' % (d.year, str('%02d' % d.month), str('%02d' % d.day), str('%02d' % d.hour), str('%02d' % d.minute))
if od != dt:
logfile = 'sql.' + od + '.sql'
if os.path.exists(logfile):
print logfile
k.key = logfile
k.set_contents_from_filename(logfile)
fi = open('sql.' + dt + '.sql', 'a')
# 不要な"(ダブルクオテーションを削除)
logtext = result.replace('"','')
fi.write('INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "' + eDatetime + '","' + logtext + '");\n')
fi.close()
# 1件づつ取得をしているため、Streamの帯域にあわせて制御
time.sleep(0.2)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import boto.kinesis,datetime,time
import threading, Queue, subprocess, sys
tailq = Queue.Queue(maxsize=10)
def tail_forever(fn):
p = subprocess.Popen(["tail", "-f", fn], stdout=subprocess.PIPE)
while 1:
line = p.stdout.readline()
tailq.put(line)
if not line:
break
def main():
fn = sys.argv[1]
threading.Thread(target=tail_forever, args=(fn,)).start()
connection = boto.kinesis.connect_to_region('ap-northeast-1')
stream_name = 'masayuki'
partition_key = 'kinesis-sample'
while True:
print connection.put_record(stream_name,tailq.get(),partition_key)
if __name__ == '__main__':
main()
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:02","106.179.89.159 - - [03/May/2015:11:06:02 +0000] GET / HTTP/1.1 403 3839 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36
");
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:02","106.179.89.159 - - [03/May/2015:11:06:02 +0000] GET /icons/apache_pb2.gif HTTP/1.1 304 - http://52.68.97.208/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36
");
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:04","106.179.89.159 - - [03/May/2015:11:06:04 +0000] GET / HTTP/1.1 403 3839 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36
");
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:04","106.179.89.159 - - [03/May/2015:11:06:04 +0000] GET /icons/apache_pb2.gif HTTP/1.1 304 - http://52.68.97.208/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36
");
[EC2上で作業実施]
1. クレデンシャルの設定まで.
# sudo yum update -y
# sudo easy_install boto
# vim ~/.boto(クレデンシャルを設定)
[Credentials]
aws_access_key_id = *********
aws_secret_access_key = *********
2. botoの動作テスト
# python
Python 2.7.9 (default, Apr 1 2015, 18:18:03)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto
>>> boto.set_stream_logger('boto')
>>> s3 = boto.connect_s3()
2015-05-03 04:04:19,508 boto [DEBUG]:Using access key found in config file.
2015-05-03 04:04:19,508 boto [DEBUG]:Using secret key found in config file.
3. kinesisへのput検証まで(/var/log/dmesg をputしていく)
# vim putrecords.py
# python putrecords.py /var/log/dmesg
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257176699974018233308339503106'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257177908899837847937514209282'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257179117825657462566688915458'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257180326751477077195863621634'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257181535677296691825038327810'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257182744603116306454213033986'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257183953528935921083387740162'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257185162454755535712562446338'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257186371380575150341737152514'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257187580306394765039631335426'}
バックグラウンドでやるとき
# python putrecords.py /var/log/dmesg &
4. apacheのログをputしてgetする検証
# python putrecords.py /var/log/httpd/*
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257188789232214572976694099970'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257189998158034187605868806146'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257191207083853802235043512322'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257192416009673426897261821954'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257193624935493041526436528130'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257194833861312656567928094722'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257196042787132271197102800898'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257197251712951885963716460546'}
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257198460638771500799049596930'}
# python getRecords.py
access_log.201505030500がS3上の任意のバケット上に作成される
Amazon Kinesis/Redshift編~アクセスログをkinesisで加工してTableauで表示してみよう1〜4
http://recipe.kc-cloud.jp/archives/6289
http://recipe.kc-cloud.jp/archives/6362
http://recipe.kc-cloud.jp/archives/6364
http://recipe.kc-cloud.jp/archives/6366
AWS Python SDK(boto)の使い方
http://recipe.kc-cloud.jp/archives/4296
http://recipe.kc-cloud.jp/archives/4300
Getting Started with Boto
http://docs.pythonboto.org/en/latest/getting_started.html#making-connections
AWSのリージョンとエンドポイント(SDKでよく参照する)
http://docs.aws.amazon.com/ja_jp/general/latest/gr/rande.html
Amazon Redshiftで高速にINSERT + UPDATEを行なう
http://analysis.blog.jp.klab.com/archives/30306912.html
pythonでJSONのエンコード、デコード
http://qiita.com/unchainendo/items/7865bdcdaadd62f2e435
Pythonでの文字列置換をマスターする
http://orangain.hatenablog.com/entry/20100503/1272900555
Logをs3とredshiftに格納する仕組み
http://www.slideshare.net/mokemokechicken/logs3redshift
AWS S3へのログの蓄積はとりあえずJSONにしましょう
http://librabuch.jp/2014/06/redshift_copy_from_json/
Amazon S3 から MySQL へのデータのコピー
http://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/dp-copydata-tomysql.html
事例からAmazon Kinesisとは何なのかを学ぶ
http://dev.classmethod.jp/cloud/aws/what-is-kinesis/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment