Skip to content

Instantly share code, notes, and snippets.

@lavr
Last active December 15, 2017 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lavr/83b78d332235952dd2b62179b68f5960 to your computer and use it in GitHub Desktop.
Save lavr/83b78d332235952dd2b62179b68f5960 to your computer and use it in GitHub Desktop.
ArmNginx logster
#!/usr/bin/python
# encoding: utf-8
import time
import re
from collections import defaultdict
import redis
import pytils
import datetime
import jinja2
import psycopg2
def get_connection():
try:
return psycopg2.connect("dbname='armpsco' user='invitroro' host='localhost' password='roro'")
except:
pass
def get_disabled_codes():
conn = get_connection()
cur = conn.cursor()
cur.execute("""SELECT code FROM branch_office where deleted = 'true' and updated_time>NOW() - INTERVAL '3000 days'""")
codes = [row[0] for row in cur.fetchall()]
return codes
def clean_disabled():
for code in get_disabled_codes():
if not code:
continue
for c in code, code.decode('utf-8'):
redis_client.zrem('sync:all', c)
HTML = u"""<html>
<head>
<title></title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head>
<body bgcolor=white>
<table>
<tr><td>Код МО<td>Дата последней синхронизации<td></tr>
{% for row in rows %}<tr><td>{{ row['MO'] }}</td><td>{{ row['time_str']}}</td><td>{{ row['datetime'] }}</td></tr>{% endfor %}
</table>
</body>
</html>
"""
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
if __name__ == '__main__':
clean_disabled()
t = time.time()
data = []
for key, value in redis_client.zrange('sync:all', 0, -1, withscores=1):
data.append([key, value])
data.sort(key=lambda x: x[1])
rows = []
for key, value in data:
#if t - value < 3600*24:
# continue
rows.append({'MO': unicode(key, 'utf-8', 'ignore'),
'time_str': pytils.dt.distance_of_time_in_words(value),
'datetime': datetime.datetime.fromtimestamp(value)})
#print key, pytils.dt.distance_of_time_in_words(value).encode('utf-8'), datetime.datetime.fromtimestamp(value).isoformat()
print jinja2.Environment().from_string(HTML).render(rows=rows).encode('utf-8')
# encoding: utf-8
import time
import re
from collections import defaultdict
import redis
from logster.logster_helper import MetricObject, LogsterParser
from logster.logster_helper import LogsterParsingException
"""
class LogsterParser(object):
pass
class LogsterParsingException(Exception):
pass
class MetricObject(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __repr__(self):
return "MetricObject %s %s" % (self.args, self.kwargs)
"""
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
symbols = (u"абвгдеёжзийклмнопрстуфхцчшщъыьэюяАБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ",
u"abvgdeejzijklmnoprstufhzcss_y_euaABVGDEEJZIJKLMNOPRSTUFHZCSS_Y_EUA")
translit_table = dict( [ (ord(a), ord(b)) for (a, b) in zip(*symbols) ] )
def translit(s):
return s.decode('utf-8').translate(translit_table)
class HTTPStatus(object):
def __init__(self):
self.statuses = {'http_1xx': 0, 'http_2xx': 0, 'http_3xx': 0, 'http_4xx': 0, 'http_5xx': 0}
def increase(self, status):
self.statuses[status] += 1
def add_status(self, status):
if (status < 200):
self.increase('http_1xx')
elif (status < 300):
self.increase('http_2xx')
elif (status < 400):
self.increase('http_3xx')
elif (status < 500):
self.increase('http_4xx')
else:
self.increase('http_5xx')
def normalize_key(s):
return s.replace('.', '_').replace(' ', '_')
def unserialize_utf8(s):
"""
\xD0\xA0\xD0\x9E\xD0\x9C\xD0\x90-\xD0\xA1\xD0\x90\xD0\x9C-\xD0\x9C\xD0\xA7 -> РОМА-САМ-МЧ
"""
return s.decode('string_escape')
class AccessLogCollector(object):
count = 0
bytes_sent = 0
request_time = 0
request_body_size = 0
def __init__(self):
self.http_statuses = HTTPStatus()
def add_data(self, status, body_length, request_time, request_body_size=0):
self.count += 1
self.bytes_sent += body_length
self.request_body_size += request_body_size
self.request_time += request_time
self.http_statuses.add_status(status)
def get_metrics(self, duration, prefix=''):
if prefix:
prefix = prefix + '.'
else:
prefix = ''
yield MetricObject("%sbytes_sent" % prefix, self.bytes_sent, "Bytes sent")
yield MetricObject("%savg_request_time" % prefix, (1000.0 * self.request_time / self.count), "Avg time in ms")
yield MetricObject("%scount" % prefix, self.count, "Requests count")
yield MetricObject("%srequest_size" % prefix, self.request_body_size, "bytes received")
for k, v in self.http_statuses.statuses.items():
yield MetricObject("%s%s" % (prefix, k), v, "Responses by status")
class ArmNginxLogster(LogsterParser):
"""
Парсим аксесс-лог АРМ, вот такой:
log_format main2 '$host $remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" [ $upstream_addr $upstream_response_time ] $request_time $request_length proto:$scheme';
armfrontend.invitro.ru 172.16.242.133 - - [23/Jun/2016:16:58:48 +0300] "POST /armpsco/service/branchoffice/synchronization HTTP/1.1" 200 474 "-" "ARMPS-MO-2.14.11(f0f4906.2016-06-16-15-50-51),OS:Windows 8.1 (6.3) x86,JRE:Oracle Corporation JRE 1.8.0_40" [ 172.17.9.69:8090 0.005 ] 0.006 1034
armfrontend.invitro.ru 172.16.245.79 - - [23/Jun/2016:16:58:48 +0300] "POST /armpsco/service/branchoffice/ping HTTP/1.1" 200 240 "-" "ARMPS-MO-2.14.8(832964d.2016-05-11-12-31-57),OS:Windows 8.1 (6.3) x86,JRE:Oracle Corporation JRE 1.8.0_40" [ 172.17.9.69:8090 0.001 ] 0.001 2034 proto:http
"""
def __init__(self, option_string=None):
self.metrics_by_uri = defaultdict(AccessLogCollector)
self.metrics_total = AccessLogCollector()
self.metrics_esb_by_uri = defaultdict(AccessLogCollector)
self.metrics_versions = {}
self.metrics_medoffice = defaultdict(int)
self.metrics_proto = defaultdict(int)
self.client_ips = set()
self.successfull_sync_by_medoffice = {}
# metrics_by_uri = {'/armpsco/service/branchoffice/synchronization': {'cnt': 100, 'bytes': 1000, 'time': 300, 'status': {'200': 100, '400': 5, '500': 4} }
regexp = '(?P<host>.*?)\ (?P<remote_addr>.*?)\ \-\ (?P<remote_user>.*?)\ \[(?P<time_local>.*?)\]\ \"(?P<request>.*?)\"\ (?P<status>.*?)\ (?P<body_bytes_sent>.*?)\ \"(?P<http_referer>.*?)\"\ \"(?P<http_user_agent>.*?)\" \[ (?P<upstreams>.*?) \]\ (?P<total_time>.*?) (?P<request_body_size>.*?) proto:(?P<request_proto>.*?)$'
self.reg = re.compile(regexp)
def parse_user_agent(self, user_agent_string):
version = user_agent_string.split('(', 1)[0]
parts = user_agent_string.split('branch:', 1)
if len(parts)>1:
medoffice = parts[1].split(',')[0]
else:
medoffice = '_'
return {'version': version, 'medoffice': medoffice}
def compose_regexp(self):
conf = '$host $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'
regex = ''.join('(?P<' + g + '>.*?)' if g else re.escape(c) for g, c in re.findall(r'\$(\w+)|(.)', conf))
#print "regex=", regex
return regex
def _parse_line(self, line):
m = self.reg.match(line)
return m.groupdict()
def parse_line(self, line):
'''This function should digest the contents of one line at a time, updating
object's state variables. Takes a single argument, the line to be parsed.'''
try:
# Apply regular expression to each line and extract interesting bits.
data = self._parse_line(line)
except Exception as e:
raise LogsterParsingException("regmatch or contents failed with %s" % e)
if not data:
raise LogsterParsingException("regmatch failed to match")
"""
data = {
'status': '200',
'total_time': '0.006',
'body_bytes_sent': '474',
'remote_addr': '172.16.242.133',
'upstreams': '172.17.9.69:8090 0.005',
'request': 'POST /armpsco/service/branchoffice/synchronization HTTP/1.1',
'host': 'armfrontend.invitro.ru',
'http_user_agent': 'ARMPS-MO-2.18.4(5e89d94.2016-12-09-13-17-17),OS:Windows 7 (6.1) x86,JRE:Oracle Corporation JRE 1.8.0_40,user:vtatarinceva,branch:\xD0\xA0\xD0\x9E\xD0\x9C\xD0\x90-\xD0\xA1\xD0\x90\xD0\x9C-\xD0\x9C\xD0\xA7',
'time_local': '23/Jun/2016:16:58:48 +0300',
'request_proto': 'https'
}
"""
uri = data['request'].split(' ')[1].replace('/armpsco/', '')
uri = uri.split('?',1)[0]
if uri.startswith('softwareUpdate'):
parts = uri.split('/')
if len(parts)>1:
uri = '/'.join(parts[:2])
elif uri.startswith('stylesheets'):
uri = 'stylesheets'
elif uri.startswith('javascripts'):
uri = 'javascripts'
elif uri.startswith('images'):
uri = 'images'
uri = uri.replace('.', '_').replace('/', '_')
status = int(data['status'])
request_time = float(data['total_time'])
body_bytes_sent = int(data['body_bytes_sent'])
request_body_size = int(data['request_body_size'])
user_agent_data = self.parse_user_agent(data['http_user_agent'])
version = user_agent_data['version']
medoffice = user_agent_data['medoffice']
self.metrics_total.add_data(status=status, body_length=body_bytes_sent, request_time=request_time, request_body_size=request_body_size)
if data['remote_addr'] == '172.17.8.174':
# separate esb data counters
self.metrics_esb_by_uri[uri].add_data(status=status, body_length=body_bytes_sent, request_time=request_time, request_body_size=request_body_size)
if uri:
self.metrics_by_uri[uri].add_data(status=status, body_length=body_bytes_sent, request_time=request_time, request_body_size=request_body_size)
self.metrics_versions.setdefault(version, 0)
self.metrics_versions[version] += 1
self.metrics_medoffice[medoffice] += 1
self.metrics_proto[data['request_proto']] += 1
self.client_ips.add(data['remote_addr'])
if ('sync' in uri) and status == 200:
self.successfull_sync_by_medoffice[medoffice] = time.time()
def get_state(self, duration):
self.duration = float(duration)
for uri, data in self.metrics_by_uri.items():
for o in data.get_metrics(duration=self.duration, prefix='uri_all.'+uri):
yield o
for uri, data in self.metrics_esb_by_uri.items():
for o in data.get_metrics(duration=self.duration, prefix='uri_esb.' + uri):
yield o
for o in self.metrics_total.get_metrics(duration=self.duration, prefix='overall'):
yield o
for version, count in self.metrics_versions.items():
yield MetricObject("version.%s" % normalize_key(version), count, "Versions count")
# Этим не пользуемся в жизни, поэтому выключаем
#if self.metrics_medoffice:
# mo_avg = sum(self.metrics_medoffice.values())/len(self.metrics_medoffice)
# for medoffice, count in self.metrics_medoffice.items():
# if count > mo_avg:
# # у нас тысяча МО, поэтому считать по каждому нет смысла
# # отправляем только те МО, которые отправили больше среднего
# yield MetricObject("mo.%s" % normalize_key(translit(unserialize_utf8(medoffice))), count, "Medoffice count")
yield MetricObject("ips.count", len(self.client_ips), "uniq ips count")
for proto, count in self.metrics_proto.items():
yield MetricObject("proto.%s" % proto, count, "requests by protocol")
# Кладём в редис инфу про последнюю синхронизацию
pipe = redis_client.pipeline()
for medoffice, t in self.successfull_sync_by_medoffice.items():
pipe.zadd('sync:all', t, unserialize_utf8(medoffice))
#k = 'sync:%s' % medoffice
#pipe.set(k, t)
#pipe.expire(k, 86400*30) # нет смысла хранить вечно
pipe.execute()
# И потом из редиса вытаскиваем инфу о количестве офисов без синхронизации
t = time.time()
times = defaultdict(int)
#for k, v in redis_client.zrangebyscore('sync:all', 0, -1):
for t1, t2, header in [[0, 3600, '1h'],
[3600, 3*3600, '3h'],
[3*3600, 24*3600, '1d'],
[24*3600, 3*24*3600, '3d'],
[3*24*3600, 7*24*3600, '7d'],
[7*24*3600, 100*24*3600, 'gt7d']
]:
cnt = redis_client.zcount('sync:all', t - t2, t - t1)
yield MetricObject("lastsync.%s" % header, cnt, "synced offices")
def tests():
p = ArmNginxLogster()
lines = """armfrontend.invitro.ru 172.31.229.240 - - [20/Mar/2017:11:03:35 +0300] "POST /armpsco/service/branchoffice/ping HTTP/1.1" 200 240 "-" "ARMPS-MO-2.18.7(d41e9ec.2016-12-27-18-33-57),OS:Windows 7 (6.1) x86,JRE:Oracle Corporation JRE 1.8.0_40,user:amihailova,branch:\xD0\xA0\xD0\x9E\xD0\x9C\xD0\x90-\xD0\xA7\xD0\x9B\xD0\x91-\xD0\xA1\xD0\x92" [ 127.0.0.1:8090 0.002 ] 0.029 727 proto:http
armfrontend.invitro.ru 172.16.25.43 - - [20/Mar/2017:11:03:35 +0300] "POST /armpsco/service/branchoffice/synchronization/v2 HTTP/1.1" 200 499 "-" "ARMPS-MO-2.18.7(d41e9ec.2016-12-27-18-33-57),OS:Windows 7 (6.1) x86,JRE:Oracle Corporation JRE 1.8.0_40,user:tmilyushina,branch:\xD0\x98\xD0\x92\xD0\x90\xD0\x9D-\xD0\x98\xD0\xA0\xD0\x9A-\xD0\xA3\xD0\x94" [ 127.0.0.1:8085 0.016 ] 0.020 1119 proto:http"""
for line in lines.split('\n'):
p.parse_line(line)
print list(p.get_state(100))
if __name__ == '__main__':
tests()
python -c "import logster"
yum install python-pip logcheck
#install https://github.com/etsy/logster
pip install -U pip redis
pip install https://github.com/etsy/logster/archive/master.zip
cp ArmNginx.py /usr/lib/python2.6/site-packages/logster/parsers/
/usr/bin/logster --dry-run -p sv-ws07.nginx.armfrontend --output=graphite --debug --graphite-protocol=udp --graphite-host=graphite.arm.invitronet.ru:12003 logster.parsers.ArmNginx.ArmNginxLogster /var/log/nginx/armfront_access.log
*/1 * * * * root /usr/bin/logster -p sv-ws07.nginx.armfrontend --output=graphite --graphite-protocol=udp --graphite-host=graphite.arm.invitronet.ru:12003 logster.parsers.ArmNginx.ArmNginxLogster /var/
log/nginx/armfront_access.log
*/10 * * * * root /usr/local/bin/arm-sync-age.py > /var/www/html/armpsco-sync-age.html
# encoding: utf-8
import time
import redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
if __name__ == '__main__':
t = time.time()
data = []
for key, value in redis_client.zrange('sync:all', 0, -1, withscores=1):
data.append([key, t-value])
data.sort(key=lambda x: x[1])
for key, value in data:
print key, value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment