Skip to content

Instantly share code, notes, and snippets.

@yusukemurayama
Created January 6, 2016 02:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yusukemurayama/4621c45684041ce0cc16 to your computer and use it in GitHub Desktop.
Save yusukemurayama/4621c45684041ce0cc16 to your computer and use it in GitHub Desktop.
# coding: utf-8
import os
import logging
import re
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
class LogReport(object):
LOG_GROUP_NAME = 'LOG_GROUP_NAME'
FILTER_PATTERN = '[ip, dummy1, uid, timestamp, request, status_code = ' \
'%(status_code)s, bytes, referer, user_agent]'
PATH_PATTERN = re.compile(
r'"(?:GET|POST|HEAD) (?P<path>[^ ]+) [^ ]+" \d{3}')
def __init__(self, output_path):
self.output_path = output_path
def output_report(self):
import boto3
self.client = boto3.client('logs')
def get_timestamp_with_milliseconds(days_before):
"""指定した日付前のタイムスタンプを取得します。"""
import time
from datetime import date, timedelta
dt = date.today() - timedelta(days=days_before)
timestamp = time.mktime(dt.timetuple())
return int(timestamp) * 1000 # タイムスタンプをミリ秒に直します。
# 1週間分のログを取得しています。
# ※面倒なので、UTCからJSTになおしていません。
self.start_time = get_timestamp_with_milliseconds(1+7)
self.end_time = get_timestamp_with_milliseconds(1) - 1
logger.debug('start_time: %d' % self.start_time)
logger.debug('end_time: %d' % self.end_time)
s4xx_paths = self.get_4xx_paths()
logger.debug('4xx_paths: %s' % s4xx_paths)
# レポートに出力します。
self.output(s4xx_paths)
def get_4xx_paths(self):
"""
4xxが発生したパスと回数を、回数の降順で取得します。
Returns:
(パス, 回数)のtupleが入ったlist
"""
import collections
data = collections.defaultdict(int)
ptn = self.FILTER_PATTERN % {'status_code': '4*'}
params = {
'logGroupName': self.LOG_GROUP_NAME,
'filterPattern': ptn,
'startTime': self.start_time,
'endTime': self.end_time,
}
response = self.client.filter_log_events(**params)
for event in response['events']:
message = event['message']
m = self.PATH_PATTERN.search(message)
if m:
data[m.group('path')] += 1
# 出現回数の降順でソートします。
return sorted(data.items(), reverse=True, key=lambda t: t[1])
def output(self, s4xx_paths):
# ファイルに出力します。
if os.path.isdir(os.path.dirname(self.output_path)):
with open(self.output_path, 'w') as fp:
fmt = '{path}: {cnt}' + os.linesep
for path, cnt in s4xx_paths:
fp.write(fmt.format(path=path, cnt=cnt))
def main():
logreport = LogReport('./report.txt')
logreport.output_report()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment