Skip to content

Instantly share code, notes, and snippets.

Last active July 9, 2019 21:48
Show Gist options
  • Save sashka/f764e112b12a93eb8a539823298c9b07 to your computer and use it in GitHub Desktop.
Save sashka/f764e112b12a93eb8a539823298c9b07 to your computer and use it in GitHub Desktop.
# encoding: utf-8
from __future__ import print_function
import argparse
import gzip
import io
import os
from time import strftime
from urlparse import urljoin
from atomicfile import AtomicFile
from collections import Counter
from vitrina.frontend.settings import DATABASE, MEDIA_PATH
from import Connection
from import Product
from import Flatpage
from import SitemapLink
from vitrina.third_party.elementflow import xml as elementflow_xml
class AtomicFileGz(object):
def __init__(self, name, mode="w+b", createmode=None):
self._f = AtomicFile(name, mode, createmode)
self._gz = gzip.GzipFile(name, mode, fileobj=self._f)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
def write(self, string):
def close(self):
class Sitemap(object):
Sitemap index generator.
Sitemap index contains no urlset itself,
they will be stored into separate urlset files created by ``Urlset`` class.
Urls assumption: sitemap and its urlset files are in the same catalog,
e.g. /www/ and /www/
Usage sample:
with Sitemap('/tmp', '') as sitemap:
with sitemap.urlsets('products') as urlset:
for i in range(100000):
urlset.add_url('' % i, priority=0.8, changefreq='daily')
with sitemap.urlsets('promo') as urlset:
urlset.add_url('', priority=0.5, changefreq='weekly')
def __init__(self, path, base_url, compress=False, indent=False):
self.path = path
self.urlsets = []
self.base_url = base_url
self.compress = compress
self.indent = indent
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
def urlset(self, urlset_name):
generator = Urlset(self.path, urlset_name, self.compress, self.indent)
return generator
def flush(self):
today = strftime('%Y-%m-%d')
with AtomicFile(os.path.join(self.path, 'sitemap.xml')) as f:
with elementflow_xml(f, 'sitemapindex', namespaces={'': ''}, indent=self.indent) as xml:
for urlset in self.urlsets:
for fname in urlset.files:
with xml.container('sitemap'):
xml.element('loc', text=urljoin(self.base_url, fname))
xml.element('lastmod', text=today)
class Urlset(object):
Sitemap urlset generator.
To be unvoked via ``Sitemap``.
Creates one or more urlset files to keep less than 30K urls per file.
def __init__(self, path, urlset_name, compress=False, indent=False, threshold=30000): = urlset_name
self.path = path
self.compress = compress
self.indent = indent
self.files = []
self.threshold = threshold = strftime('%Y-%m-%d')
self._fname = None
self._f = None
self._url_count = None
self._xml = None
def _new_file(self):
self._url_count = 0
n = len(self.files)
seq = '_%d' % n if n else ''
trail = '.gz' if self.compress else ''
self._fname = 'sitemap_%s%s.xml%s' % (, seq, trail)
if self.compress:
self._f = AtomicFileGz(os.path.join(self.path, self._fname))
self._f = AtomicFile(os.path.join(self.path, self._fname))
# I'm going to simulate ``with``.
self._xml = elementflow_xml(self._f, 'urlset', namespaces={'': ''}, indent=self.indent).__enter__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
def add_url(self, link, lastmod=None, priority=0.5, changefreq='weekly'):
dt = lastmod.strftime('%Y-%m-%d') if lastmod is not None else
with self._xml.container('url'):
self._xml.element('loc', text=link)
self._xml.element('priority', text=str(priority))
self._xml.element('changefreq', text=changefreq)
self._xml.element('lastmod', text=dt)
self._url_count += 1
# New urlset file to be created when the threshold is surpassed.
if self._url_count > self.threshold:
def flush(self, exit=False):
if not self._url_count:
# Simulating end of ``with`` clause.
self._xml.__exit__(None, None, None)
if not exit:
def print_stats(filename, stats):
print(u'%s: %s' % (filename, ' '.join(['%s=%d' % (k, stats[k]) for k in stats])))
def normalize_url(url, base=''):
if 'http://' in url:
return url
return urljoin(base, url)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Sitemap generator')
parser.add_argument('-D', '--datadir', help='Path to CSV or YAML files',
dest='datadir', type=str, action='store', default='csv')
args = parser.parse_args()
db = Connection(, DATABASE.database, user=DATABASE.user, password=DATABASE.password, time_zone=DATABASE.time_zone)
stats = Counter()
xml_path = os.path.join(MEDIA_PATH, 'xml')
xml_url = ''
with Sitemap(xml_path, xml_url, compress=True) as sitemap:
with sitemap.urlset('products') as urlset:
for product in Product.list_active_skus(db):
urlset.add_url(normalize_url('/product/%d' % product.sku), priority=0.9, changefreq='weekly')
stats.update(product=1, total=1)
with sitemap.urlset('additional') as urlset:
for link in SitemapLink.get_all(db):
if link.is_public:
urlset.add_url(normalize_url(link.url), priority=0.8, changefreq='weekly')
stats.update(additional=1, total=1)
for link in Flatpage.get_all(db, active=True):
urlset.add_url(normalize_url('/page/' + link.safe_path), priority=0.8, changefreq='weekly')
stats.update(flatpage=1, additional=1, total=1)
for link in, 'sitemap_static.csv'), 'rt', newline=''):
link = link.strip()
if link:
urlset.add_url(normalize_url(link), priority=0.5, changefreq='monthly')
stats.update(static=1, additional=1, total=1)
print_stats(os.path.join(xml_path, 'sitemap.xml'), stats)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment