Skip to content

Instantly share code, notes, and snippets.

@vitorfs
Last active November 12, 2019 01:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vitorfs/6ca937d6bcc378cab167d5b330dac477 to your computer and use it in GitHub Desktop.
Save vitorfs/6ca937d6bcc378cab167d5b330dac477 to your computer and use it in GitHub Desktop.
Collect page views from Google Analytics
import argparse
from apiclient.discovery import build
import httplib2
from oauth2client import client
from oauth2client import file
from oauth2client import tools
from web.api.models import Article, PageView
FIVE_MINUTES = 5 * 60
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
DISCOVERY_URI = ('https://analyticsreporting.googleapis.com/$discovery/rest')
CLIENT_SECRETS_PATH = 'client_secrets.json'
VIEW_ID = 'xxxxxxxxxxxxxx' # google analytics view id
def initialize_analyticsreporting():
"""Initializes the analyticsreporting service object.
Returns:
analytics an authorized analyticsreporting service object.
"""
# Parse command-line arguments.
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, parents=[tools.argparser])
flags = parser.parse_args([])
# Set up a Flow object to be used if we need to authenticate.
flow = client.flow_from_clientsecrets(CLIENT_SECRETS_PATH, scope=SCOPES, message=tools.message_if_missing(CLIENT_SECRETS_PATH))
# Prepare credentials, and authorize HTTP object with them.
# If the credentials don't exist or are invalid run through the native client
# flow. The Storage object will ensure that if successful the good
# credentials will get written back to a file.
storage = file.Storage('analyticsreporting.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = tools.run_flow(flow, storage, flags)
http = credentials.authorize(http=httplib2.Http())
# Build the service object.
analytics = build('analytics', 'v4', http=http, discoveryServiceUrl=DISCOVERY_URI)
return analytics
def get_page_views_report(analytics):
return analytics.reports().batchGet(
body={
'reportRequests': [{
'viewId': VIEW_ID,
'dateRanges': [
{
'startDate': '2015-11-23', # customize here the data range to display the views
'endDate': 'today'
},
{
'startDate': '7daysAgo',
'endDate': 'today'
}
],
'metrics': [
{
'expression': 'ga:pageviews'
}
],
'dimensions': [
{
'name': 'ga:pagePath'
}
],
'orderBys': [
{
'fieldName': 'ga:pageviews',
'orderType': 'VALUE',
'sortOrder': 'DESCENDING'
}
],
}]
}
).execute()
def parse_response(response):
"""Parses and update the database using the Analytics Reporting API V4 response"""
for report in response.get('reports', []):
columnHeader = report.get('columnHeader', {})
dimensionHeaders = columnHeader.get('dimensions', [])
metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
rows = report.get('data', {}).get('rows', [])
for row in rows:
dimensions = row.get('dimensions', [])
dateRangeValues = row.get('metrics', [])
if dimensions:
# add here what you want to do with the page views
# in my case I save it to a Django model
try:
article = Article.objects.get(url=dimensions[0])
for date_range, values in enumerate(dateRangeValues):
for value in values.get('values'):
PageView.objects.update_or_create(
article=article,
date_range=date_range,
defaults={
'page_views': value
}
)
except Article.DoesNotExist:
pass
def get_google_analytics_data():
try:
analytics = initialize_analyticsreporting()
response = get_page_views_report(analytics)
parse_response(response)
except:
reactor.stop()
@python_2_unicode_compatible
class Article(models.Model):
url = models.CharField(max_length=255, unique=True)
title = models.CharField(max_length=255, blank=True)
author = models.CharField(max_length=30, blank=True)
category = models.CharField(max_length=30, blank=True)
tags = models.CharField(max_length=255, blank=True)
publication_date = models.DateTimeField(null=True, blank=True)
featured_image_url = models.CharField(max_length=255, blank=True)
thumbnail_url = models.CharField(max_length=255, blank=True)
excerpt = models.TextField(blank=True)
class Meta:
db_table = 'articles'
verbose_name = 'article'
verbose_name_plural = 'articles'
def __str__(self):
return self.title
@python_2_unicode_compatible
class PageView(models.Model):
ALL_TIME = 0
LAST_7_DAYS = 1
DATE_RANGE_CHOICES = (
(ALL_TIME, 'All time'),
(LAST_7_DAYS, 'Last 7 days'),
)
article = models.ForeignKey(Article, on_delete=models.CASCADE, related_name='page_views')
updated_at = models.DateTimeField(auto_now=True)
date_range = models.PositiveSmallIntegerField(choices=DATE_RANGE_CHOICES)
page_views = models.PositiveIntegerField(default=0)
class Meta:
db_table = 'page_views'
verbose_name = 'page view'
verbose_name_plural = 'page views'
def __str__(self):
return '{}: {}'.format(self.article_id, self.page_views)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment