This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from gevent import monkey; monkey.patch_all() | |
from base64 import b64decode | |
import datetime | |
import json | |
import os | |
from urlparse import parse_qsl, urlparse | |
from flask import Flask, Response, abort, request | |
from peewee import * | |
from playhouse.berkeleydb import BerkeleyDatabase # Optional. | |
# 1 pixel GIF, base64-encoded. | |
BEACON = b64decode('R0lGODlhAQABAIAAANvf7wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==') | |
# Store the database file in the app directory. | |
APP_DIR = os.path.dirname(__file__) | |
DATABASE_NAME = os.path.join(APP_DIR, 'analytics.db') | |
DOMAIN = 'http://127.0.0.1:5000' # TODO: change me. | |
# Simple JavaScript which will be included and executed on the client-side. | |
JAVASCRIPT = """(function(){ | |
var d=document,i=new Image,e=encodeURIComponent; | |
i.src='%s/a.gif?url='+e(d.location.href)+'&ref='+e(d.referrer)+'&t='+e(d.title); | |
})()""".replace('\n', '') | |
# Flask application settings. | |
DEBUG = bool(os.environ.get('DEBUG')) | |
SECRET_KEY = 'secret - change me' # TODO: change me. | |
app = Flask(__name__) | |
app.config.from_object(__name__) | |
database = BerkeleyDatabase(DATABASE_NAME) # or SqliteDatabase(DATABASE_NAME) | |
class JSONField(TextField): | |
"""Store JSON data in a TextField.""" | |
def python_value(self, value): | |
if value is not None: | |
return json.loads(value) | |
def db_value(self, value): | |
if value is not None: | |
return json.dumps(value) | |
class PageView(Model): | |
domain = CharField() | |
url = TextField() | |
timestamp = DateTimeField(default=datetime.datetime.now, index=True) | |
title = TextField(default='') | |
ip = CharField(default='') | |
referrer = TextField(default='') | |
headers = JSONField() | |
params = JSONField() | |
class Meta: | |
database = database | |
@classmethod | |
def create_from_request(cls): | |
parsed = urlparse(request.args['url']) | |
params = dict(parse_qsl(parsed.query)) | |
return PageView.create( | |
domain=parsed.netloc, | |
url=parsed.path, | |
title=request.args.get('t') or '', | |
ip=request.headers.get('X-Forwarded-For', request.remote_addr), | |
referrer=request.args.get('ref') or '', | |
headers=dict(request.headers), | |
params=params) | |
@app.route('/a.gif') | |
def analyze(): | |
if not request.args.get('url'): | |
abort(404) | |
with database.transaction(): | |
PageView.create_from_request() | |
response = Response(app.config['BEACON'], mimetype='image/gif') | |
response.headers['Cache-Control'] = 'private, no-cache' | |
return response | |
@app.route('/a.js') | |
def script(): | |
return Response( | |
app.config['JAVASCRIPT'] % (app.config['DOMAIN']), | |
mimetype='text/javascript') | |
@app.errorhandler(404) | |
def not_found(e): | |
return Response('Not found.') | |
if __name__ == '__main__': | |
database.create_tables([PageView], safe=True) | |
app.run() # Use Flask's builtin WSGI server. | |
# Or for gevent, | |
# from gevent.wsgi import WSGIServer | |
# WSGIServer(('', 5000), app).serve_forever() | |
database.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import Counter | |
import datetime | |
import optparse | |
from peewee import * | |
from analytics import database | |
from analytics import PageView | |
def get_query(start, end): | |
query = PageView.select() | |
if start and end: | |
query = query.where(PageView.timestamp.between(start, end)) | |
elif start: | |
query = query.where(PageView.timestamp >= start) | |
elif end: | |
query = query.where(PageView.timestamp <= end) | |
return query | |
def page_views(query): | |
return query.count() | |
def unique_ips(query): | |
return (query | |
.select(PageView.ip) | |
.group_by(PageView.ip) | |
.count()) | |
def top_pages(query, limit): | |
return (query | |
.select(PageView.title, fn.COUNT(PageView.id)) | |
.group_by(PageView.title) | |
.order_by(fn.COUNT(PageView.id).desc()) | |
.tuples() | |
.limit(limit)) | |
def top_traffic_times(query): | |
chunks = 3 | |
hour = fn.date_part('hour', PageView.timestamp) / chunks | |
id_count = fn.COUNT(PageView.id) | |
result = dict(query | |
.select(hour, id_count) | |
.group_by(hour) | |
.order_by(hour) | |
.tuples()) | |
total = sum(result.values()) | |
return [ | |
('%s - %s' % (i * chunks, (i + 1) * chunks), | |
result[i], | |
(100. * result[i]) / total) | |
for i in range(24 / chunks)] | |
def user_agents(query, limit): | |
c = Counter(pv.headers.get('User-Agent') for pv in query) | |
return c.most_common(limit) | |
def languages(query, limit): | |
c = Counter(pv.headers.get('Accept-Language') for pv in query) | |
return c.most_common(limit) | |
def get_paths(query, limit): | |
inner = (query | |
.select(PageView.ip, PageView.url) | |
.order_by(PageView.timestamp)) | |
paths = (PageView | |
.select( | |
PageView.ip, | |
fn.GROUP_CONCAT(PageView.url)) | |
.from_(inner.alias('t1')) | |
.group_by(PageView.ip) | |
.order_by(fn.COUNT(PageView.url).desc()) | |
.tuples() | |
.limit(limit)) | |
return [(ip, urls.split(',')) for ip, urls in paths] | |
def get_low_high(query): | |
base = query.select(PageView.timestamp) | |
def conv(s): | |
return datetime.datetime.strptime( | |
s, '%Y-%m-%d %H:%M:%S.%f').strftime('%Y-%m-%d %H:%M') | |
low = base.order_by(PageView.id.asc()).scalar() | |
high = base.order_by(PageView.id.desc()).scalar() | |
return conv(low), conv(high) | |
def print_banner(s): | |
print '-' * len(s) | |
print s | |
print '-' * len(s) | |
def run_report(start, end, limit, skip_paths=False): | |
query = get_query(start, end) | |
low, high = get_low_high(query) | |
print_banner('Overview from %s to %s' % (low, high)) | |
print '%4d page views' % page_views(query) | |
print '%4d unique IPs' % unique_ips(query) | |
print_banner('Top Pages') | |
for title, count in top_pages(query, limit): | |
print '%4d : %s' % (count, title) | |
print_banner('Traffic by Hour') | |
for hour, count, percent in top_traffic_times(query): | |
print '%9s : %s (%s%%)' % (hour, count, round(percent, 1)) | |
if not skip_paths: | |
print_banner('Paths') | |
for ip, path in get_paths(query, limit): | |
print ip | |
for url in path: | |
print ' * %s' % url | |
def get_parser(): | |
parser = optparse.OptionParser() | |
ao = parser.add_option | |
ao('-n', '--days', dest='count', type='int', | |
help='Number of days worth of records to analyze.') | |
ao('-d', '--day', dest='day', type='int', | |
help='Day to analyze.') | |
ao('-m', '--month', dest='month', type='int', | |
help='Month to analyze.') | |
ao('-y', '--year', dest='year', type='int', | |
help='Year to analyze.') | |
ao('-r', '--records', dest='records', type='int', default=20, | |
help='Number of records to show') | |
ao('-x', '--no-paths', dest='no_paths', action='store_true', | |
help='Do not print paths') | |
return parser | |
if __name__ == '__main__': | |
parser = get_parser() | |
options, args = parser.parse_args() | |
database.connect() | |
today = datetime.date.today() | |
if options.year or options.month or options.day: | |
start_date = datetime.date(today.year, 1, 1) | |
if options.year: | |
start_date = start_date.replace(year=options.year) | |
if options.month: | |
start_date = start_date.replace(month=options.month) | |
if options.day: | |
start_date = start_date.replace(day=options.day) | |
else: | |
start_date = None | |
end_date = None | |
if options.count: | |
delta = datetime.timedelta(days=options.count) | |
if start_date: | |
end_date = start_date + delta | |
else: | |
start_date = today - delta | |
run_report(start_date, end_date, options.records, options.no_paths) | |
database.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment