Skip to content

Instantly share code, notes, and snippets.

@sashka
Last active July 9, 2019 21:48
Show Gist options
  • Save sashka/f764e112b12a93eb8a539823298c9b07 to your computer and use it in GitHub Desktop.
Save sashka/f764e112b12a93eb8a539823298c9b07 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# encoding: utf-8
from __future__ import print_function
import argparse
import gzip
import io
import os
from time import strftime
from urlparse import urljoin
from atomicfile import AtomicFile
from collections import Counter
from vitrina.frontend.settings import DATABASE, MEDIA_PATH
from vitrina.storage import Connection
from vitrina.storage.product import Product
from vitrina.storage.flatpage import Flatpage
from vitrina.storage.seo import SitemapLink
from vitrina.third_party.elementflow import xml as elementflow_xml
class AtomicFileGz(object):
def __init__(self, name, mode="w+b", createmode=None):
self._f = AtomicFile(name, mode, createmode)
self._gz = gzip.GzipFile(name, mode, fileobj=self._f)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
return
self.close()
def write(self, string):
self._gz.write(string)
def close(self):
self._gz.close()
self._f.close()
class Sitemap(object):
"""
Sitemap index generator.
Sitemap index contains no urlset itself,
they will be stored into separate urlset files created by ``Urlset`` class.
Urls assumption: sitemap and its urlset files are in the same catalog,
e.g. /www/example.com/xml/sitemap.xml and /www/example.com/xml/sitemap_products.xml
Usage sample:
with Sitemap('/tmp', 'http://example.com/tmp') as sitemap:
with sitemap.urlsets('products') as urlset:
for i in range(100000):
urlset.add_url('http://example.com/article/%d' % i, priority=0.8, changefreq='daily')
with sitemap.urlsets('promo') as urlset:
urlset.add_url('http://example.com/promo', priority=0.5, changefreq='weekly')
"""
def __init__(self, path, base_url, compress=False, indent=False):
self.path = path
self.urlsets = []
self.base_url = base_url
self.compress = compress
self.indent = indent
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
return
self.flush()
def urlset(self, urlset_name):
generator = Urlset(self.path, urlset_name, self.compress, self.indent)
self.urlsets.append(generator)
return generator
def flush(self):
today = strftime('%Y-%m-%d')
with AtomicFile(os.path.join(self.path, 'sitemap.xml')) as f:
with elementflow_xml(f, 'sitemapindex', namespaces={'': 'http://www.sitemaps.org/schemas/sitemap/0.9'}, indent=self.indent) as xml:
for urlset in self.urlsets:
for fname in urlset.files:
with xml.container('sitemap'):
xml.element('loc', text=urljoin(self.base_url, fname))
xml.element('lastmod', text=today)
class Urlset(object):
"""
Sitemap urlset generator.
To be unvoked via ``Sitemap``.
Creates one or more urlset files to keep less than 30K urls per file.
"""
def __init__(self, path, urlset_name, compress=False, indent=False, threshold=30000):
self.name = urlset_name
self.path = path
self.compress = compress
self.indent = indent
self.files = []
self.threshold = threshold
self.today = strftime('%Y-%m-%d')
self._fname = None
self._f = None
self._url_count = None
self._xml = None
self._new_file()
def _new_file(self):
self._url_count = 0
n = len(self.files)
seq = '_%d' % n if n else ''
trail = '.gz' if self.compress else ''
self._fname = 'sitemap_%s%s.xml%s' % (self.name, seq, trail)
if self.compress:
self._f = AtomicFileGz(os.path.join(self.path, self._fname))
else:
self._f = AtomicFile(os.path.join(self.path, self._fname))
# I'm going to simulate ``with``.
self._xml = elementflow_xml(self._f, 'urlset', namespaces={'': 'http://www.sitemaps.org/schemas/sitemap/0.9'}, indent=self.indent).__enter__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
return
self.flush(exit=True)
def add_url(self, link, lastmod=None, priority=0.5, changefreq='weekly'):
dt = lastmod.strftime('%Y-%m-%d') if lastmod is not None else self.today
with self._xml.container('url'):
self._xml.element('loc', text=link)
self._xml.element('priority', text=str(priority))
self._xml.element('changefreq', text=changefreq)
self._xml.element('lastmod', text=dt)
self._url_count += 1
# New urlset file to be created when the threshold is surpassed.
if self._url_count > self.threshold:
self.flush()
def flush(self, exit=False):
if not self._url_count:
return
# Simulating end of ``with`` clause.
self._xml.__exit__(None, None, None)
self._f.close()
self.files.append(self._fname)
if not exit:
self._new_file()
def print_stats(filename, stats):
print(u'%s: %s' % (filename, ' '.join(['%s=%d' % (k, stats[k]) for k in stats])))
def normalize_url(url, base='http://example.com/'):
if 'http://' in url:
return url
return urljoin(base, url)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Sitemap generator')
parser.add_argument('-D', '--datadir', help='Path to CSV or YAML files',
dest='datadir', type=str, action='store', default='csv')
args = parser.parse_args()
db = Connection(DATABASE.host, DATABASE.database, user=DATABASE.user, password=DATABASE.password, time_zone=DATABASE.time_zone)
stats = Counter()
xml_path = os.path.join(MEDIA_PATH, 'xml')
xml_url = 'http://example.com/xml/'
with Sitemap(xml_path, xml_url, compress=True) as sitemap:
with sitemap.urlset('products') as urlset:
for product in Product.list_active_skus(db):
urlset.add_url(normalize_url('/product/%d' % product.sku), priority=0.9, changefreq='weekly')
stats.update(product=1, total=1)
with sitemap.urlset('additional') as urlset:
for link in SitemapLink.get_all(db):
if link.is_public:
urlset.add_url(normalize_url(link.url), priority=0.8, changefreq='weekly')
stats.update(additional=1, total=1)
for link in Flatpage.get_all(db, active=True):
urlset.add_url(normalize_url('/page/' + link.safe_path), priority=0.8, changefreq='weekly')
stats.update(flatpage=1, additional=1, total=1)
for link in io.open(os.path.join(args.datadir, 'sitemap_static.csv'), 'rt', newline=''):
link = link.strip()
if link:
urlset.add_url(normalize_url(link), priority=0.5, changefreq='monthly')
stats.update(static=1, additional=1, total=1)
print_stats(os.path.join(xml_path, 'sitemap.xml'), stats)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment