Skip to content

Instantly share code, notes, and snippets.

Last active June 1, 2023 18:59
Show Gist options
  • Save coleifer/9899de010c647823a14f to your computer and use it in GitHub Desktop.
Save coleifer/9899de010c647823a14f to your computer and use it in GitHub Desktop.
# from gevent import monkey; monkey.patch_all()
from base64 import b64decode
import datetime
import json
import os
from urlparse import parse_qsl, urlparse
from flask import Flask, Response, abort, request
from peewee import *
from playhouse.berkeleydb import BerkeleyDatabase # Optional.
# 1 pixel GIF, base64-encoded.
# Store the database file in the app directory.
APP_DIR = os.path.dirname(__file__)
DATABASE_NAME = os.path.join(APP_DIR, 'analytics.db')
DOMAIN = '' # TODO: change me.
# Simple JavaScript which will be included and executed on the client-side.
JAVASCRIPT = """(function(){
var d=document,i=new Image,e=encodeURIComponent;
})()""".replace('\n', '')
# Flask application settings.
DEBUG = bool(os.environ.get('DEBUG'))
SECRET_KEY = 'secret - change me' # TODO: change me.
app = Flask(__name__)
database = BerkeleyDatabase(DATABASE_NAME) # or SqliteDatabase(DATABASE_NAME)
class JSONField(TextField):
"""Store JSON data in a TextField."""
def python_value(self, value):
if value is not None:
return json.loads(value)
def db_value(self, value):
if value is not None:
return json.dumps(value)
class PageView(Model):
domain = CharField()
url = TextField()
timestamp = DateTimeField(, index=True)
title = TextField(default='')
ip = CharField(default='')
referrer = TextField(default='')
headers = JSONField()
params = JSONField()
class Meta:
database = database
def create_from_request(cls):
parsed = urlparse(request.args['url'])
params = dict(parse_qsl(parsed.query))
return PageView.create(
title=request.args.get('t') or '',
ip=request.headers.get('X-Forwarded-For', request.remote_addr),
referrer=request.args.get('ref') or '',
def analyze():
if not request.args.get('url'):
with database.transaction():
response = Response(app.config['BEACON'], mimetype='image/gif')
response.headers['Cache-Control'] = 'private, no-cache'
return response
def script():
return Response(
app.config['JAVASCRIPT'] % (app.config['DOMAIN']),
def not_found(e):
return Response('Not found.')
if __name__ == '__main__':
database.create_tables([PageView], safe=True) # Use Flask's builtin WSGI server.
# Or for gevent,
# from gevent.wsgi import WSGIServer
# WSGIServer(('', 5000), app).serve_forever()
from collections import Counter
import datetime
import optparse
from peewee import *
from analytics import database
from analytics import PageView
def get_query(start, end):
query =
if start and end:
query = query.where(PageView.timestamp.between(start, end))
elif start:
query = query.where(PageView.timestamp >= start)
elif end:
query = query.where(PageView.timestamp <= end)
return query
def page_views(query):
return query.count()
def unique_ips(query):
return (query
def top_pages(query, limit):
return (query
.select(PageView.title, fn.COUNT(
def top_traffic_times(query):
chunks = 3
hour = fn.date_part('hour', PageView.timestamp) / chunks
id_count = fn.COUNT(
result = dict(query
.select(hour, id_count)
total = sum(result.values())
return [
('%s - %s' % (i * chunks, (i + 1) * chunks),
(100. * result[i]) / total)
for i in range(24 / chunks)]
def user_agents(query, limit):
c = Counter(pv.headers.get('User-Agent') for pv in query)
return c.most_common(limit)
def languages(query, limit):
c = Counter(pv.headers.get('Accept-Language') for pv in query)
return c.most_common(limit)
def get_paths(query, limit):
inner = (query
.select(PageView.ip, PageView.url)
paths = (PageView
return [(ip, urls.split(',')) for ip, urls in paths]
def get_low_high(query):
base =
def conv(s):
return datetime.datetime.strptime(
s, '%Y-%m-%d %H:%M:%S.%f').strftime('%Y-%m-%d %H:%M')
low = base.order_by(
high = base.order_by(
return conv(low), conv(high)
def print_banner(s):
print '-' * len(s)
print s
print '-' * len(s)
def run_report(start, end, limit, skip_paths=False):
query = get_query(start, end)
low, high = get_low_high(query)
print_banner('Overview from %s to %s' % (low, high))
print '%4d page views' % page_views(query)
print '%4d unique IPs' % unique_ips(query)
print_banner('Top Pages')
for title, count in top_pages(query, limit):
print '%4d : %s' % (count, title)
print_banner('Traffic by Hour')
for hour, count, percent in top_traffic_times(query):
print '%9s : %s (%s%%)' % (hour, count, round(percent, 1))
if not skip_paths:
for ip, path in get_paths(query, limit):
print ip
for url in path:
print ' * %s' % url
def get_parser():
parser = optparse.OptionParser()
ao = parser.add_option
ao('-n', '--days', dest='count', type='int',
help='Number of days worth of records to analyze.')
ao('-d', '--day', dest='day', type='int',
help='Day to analyze.')
ao('-m', '--month', dest='month', type='int',
help='Month to analyze.')
ao('-y', '--year', dest='year', type='int',
help='Year to analyze.')
ao('-r', '--records', dest='records', type='int', default=20,
help='Number of records to show')
ao('-x', '--no-paths', dest='no_paths', action='store_true',
help='Do not print paths')
return parser
if __name__ == '__main__':
parser = get_parser()
options, args = parser.parse_args()
today =
if options.year or options.month or
start_date =, 1, 1)
if options.year:
start_date = start_date.replace(year=options.year)
if options.month:
start_date = start_date.replace(month=options.month)
start_date = start_date.replace(
start_date = None
end_date = None
if options.count:
delta = datetime.timedelta(days=options.count)
if start_date:
end_date = start_date + delta
start_date = today - delta
run_report(start_date, end_date, options.records, options.no_paths)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment