Skip to content

Instantly share code, notes, and snippets.

@duker33
Last active June 7, 2019 11:53
Show Gist options
  • Save duker33/680082d893aba91995ad9082832e8ffe to your computer and use it in GitHub Desktop.
Save duker33/680082d893aba91995ad9082832e8ffe to your computer and use it in GitHub Desktop.
Import the old STB data to the actual DB
from contextlib import contextmanager
from functools import lru_cache
import json
import logging
import random
import string
import typing
from itertools import groupby
from django.db import models, transaction
from django.contrib.redirects.models import Redirect
from django.contrib.sites.models import Site
from django.shortcuts import reverse
from pages import models as pages_models
from stroyprombeton import models as stb_models
def randomize_slug(slug: str, hash_size: int) -> str:
hash_size = hash_size
slug_hash = ''.join(
random.choices(string.ascii_lowercase, k=hash_size)
)
return f'{slug}_{slug_hash}'
logger = logging.getLogger(__name__)
def log(*args):
# logger.info('\n'.join(str(args)))
print(*args)
# DataItem as it presented at json files
DataItem = typing.Dict[
str,
typing.Union[
str, int, 'DataItem',
typing.List['DataItem']
],
]
class DataType:
PLAIN = []
RELATED = []
def __init__(self, item: DataItem):
for field in self.PLAIN:
setattr(self, field, item[field])
def to_data(self) -> dict:
return {
**{f: getattr(self, f) for f in self.PLAIN},
**{f: getattr(self, f).to_data() for f in self.RELATED}
}
class ListDataType:
items: typing.List[DataType] = None
def to_data(self) -> list:
return [i.to_data() for i in self.items]
def count(self):
return len(self.items)
def __iter__(self):
for i in self.items:
yield i
def save(self) -> typing.List[models.Model]:
saved = [item.save() for item in self]
return list(filter(None, saved))
class Page(DataType):
PLAIN = [
'h1', 'slug', 'content', 'position',
'description', 'keywords', 'seo_text',
]
h1, slug, content, position, description, keywords, seo_text = 7 * (None, )
def to_model(self, parent: pages_models.ModelPage) -> pages_models.ModelPage:
return (
pages_models.ModelPage.objects
.create(
parent=parent,
**{f: getattr(self, f) for f in self.PLAIN}
)
)
class TagGroup(DataType):
MAP = dict(
length='Длина', width='Ширина', height='Высота',
weight='Масса', volume='Объём',
diameter_out='Внешний диаметр', diameter_in='Внутренний диаметр',
specification='Рабочая документация',
)
PLAIN = ['name']
def __init__(self, field: str):
self.name = self.MAP[field]
def fetch(self) -> stb_models.TagGroup:
return stb_models.TagGroup.objects.get(name=self.name)
class Tag(DataType):
PLAIN = ['id', 'name', 'position', 'slug']
RELATED = ['group']
DIMENSIONS = dict(
length='Длина', width='Ширина', height='Высота',
weight='Масса', volume='Объём',
diameter_out='Внешний диаметр', diameter_in='Внутренний диаметр',
specification='Рабочая документация',
)
def __init__(self, field: str, value: str):
self.name = str(value)
self.group = TagGroup(field)
self.field = field
def save(self, option: stb_models.Option) -> stb_models.Tag:
try:
tag, _ = stb_models.Tag.objects.get_or_create(
name=f'{self.name} {self.DIMENSIONS[self.field]}',
group=self.group.fetch(),
)
except Exception as e:
tag = stb_models.Tag(name=self.name, group=self.group.fetch())
original = stb_models.Tag.objects.filter(slug=tag._get_slug()).first()
log(f'original {original.group.name}, {original.name}, {original.slug}')
log(f'trying {self.group.name}, {self.name}')
raise e
tag.options.add(option)
tag.save()
return tag
class Tags(ListDataType):
PLAIN = [
'specification',
'length', 'width', 'height',
'weight', 'volume',
'diameter_out', 'diameter_in'
]
def __init__(self, option: DataItem):
self.items: typing.List[Tag] = [Tag(field=f, value=option[f]) for f in self.PLAIN]
def save(self, option: stb_models.Option):
for tag in self.items:
tag.save(option)
class Option(DataType):
TAGS = [
'specification',
'length', 'width', 'height',
'weight', 'volume',
'diameter_out', 'diameter_in'
]
OPTION = ['mark', 'price']
PLAIN = [*OPTION, *TAGS,]
RELATED = ['product', 'old_product', 'series', 'tags']
code, mark, price = 3 * (None, )
product, old_product, series, tags = 4 * (None, )
def __init__(self, option: DataItem):
super().__init__(option)
assert isinstance(option['old_product'], int)
assert isinstance(option['product'], int)
self.old_product: int = option['old_product']
self.product: int = option['product']
self.tags = Tags(option)
@classmethod
def from_product(cls, product: 'Product', head: 'Product'):
"""
:param head: The main product at the group.
"""
return cls({
**{'product': head.id, 'old_product': product.id},
**{f: getattr(product, f) for f in cls.PLAIN}
})
@property
def old_url(self) -> str:
return reverse('product', args=(self.old_product, ))
@lru_cache(maxsize=1)
def to_model(self, product: stb_models.Product) -> stb_models.Option:
option = stb_models.Option(
product=product,
**{f: getattr(self, f) for f in self.OPTION}
)
# patch because `Option.url` is not really implemented
option.url_ = product.url + f'?option_id={option.id}'
return option
class Options(ListDataType):
def __init__(self, options: typing.List[DataItem]):
self.items: typing.List[Option] = [Option(o) for o in options]
@classmethod
def from_options(cls, options: typing.List[Option]):
result = cls([])
result.items = options
return result
# product is related to the current options set
def save(self, product: stb_models.Product):
options = stb_models.Option.objects.bulk_create([
o.to_model(product) for o in self.items
])
for data, model in zip(self.items, options):
assert data.code == model.code
assert data.price == model.price
assert data.mark == model.mark
data.tags.save(model)
Redirect.objects.bulk_create([
Redirect(
old_path=option.old_url,
new_path=option.to_model(product).url_,
site=Site.objects.first(),
) for option in self.items
])
class Parent(DataType):
PLAIN = ['id', 'name']
id, name = None, None
class Category(DataType):
PLAIN = ['id', 'name', 'specification']
RELATED = ['page', 'parent']
id, name, specificaton = 3 * (None, )
page, parent = 2 * (None, )
def __init__(self, category: DataItem):
super().__init__(category)
self.page = Page(category['page'])
self.parent = Parent(category['parent'])
@staticmethod
def from_parent(parent: Parent) -> 'Category':
similar = [c for c in categories.items if c.id == parent.id]
assert len(similar) == 1, [s.name for s in similar]
return similar[0]
def is_second_level(self):
roots = [c.id for c in categories.get_roots()]
return self.parent.id in roots
def is_root(self):
roots = [c.id for c in categories.get_roots()]
return self.id in roots
@property
def url(self) -> str:
return reverse('category', args=(self.id,))
@property
@lru_cache(maxsize=1)
def filtered(self):
category = self
while 'Серия' in category.name:
category = Category.from_parent(category.parent)
return category
def save(self) -> typing.Union[stb_models.Category, None]:
if self.is_root():
return
category = stb_models.Category.objects.filter(name=self.name).first()
if category:
assert category.page.slug == self.page.slug
else:
assert not pages_models.Page.objects.filter(name=self.name), \
f'Have no category, but have page. Name is {self.name}'
if not self.is_second_level():
parent = stb_models.Category.objects.filter(name=self.parent.name).first()
if not parent:
parent = Category.from_parent(self.parent).save()
assert parent, self.parent.name
else:
parent = None
page = self.page.to_model(parent.page if parent else None)
page.related_model_name = 'stroyprombeton_category'
page.type = 'model'
page.save()
category = (
stb_models.Category.objects
.create(name=self.name, page=page, parent=parent)
)
return category
class Product(DataType):
PLAIN = [
'id', 'name',
'price', 'mark',
# tags content
'specification',
'length', 'width', 'height',
'weight', 'volume',
'diameter_out', 'diameter_in'
]
RELATED = ['page', 'category', 'tags']
id, name, price, mark, specification = 5 * (None, )
page, category, tags = 3 * (None, )
options = None
def __init__(self, product: DataItem):
super().__init__(product)
self.page = Page(product['page'])
self.category = Category(product['category'])
# self.options: Options = Options(
# # option fields set is subset the product fields.
# # So, we pass old product data as option data
# [{**p, 'product': product['id']} for p in product['group']]
# )
@property
def url(self) -> str:
return reverse('product', args=(self.id,))
@classmethod
def from_group(cls, group: typing.Iterable['Product']) -> 'Product':
group = list(group)
product = group[0]
product.options = Options.from_options(
[Option.from_product(p, product) for p in group]
)
assert all(isinstance(o.product, int) for o in product.options)
assert all(o.product == product.id for o in product.options)
return product
def save(self) -> stb_models.Product:
product = (
stb_models.Product.objects
.filter(name=self.name, category__name=self.category.filtered.name)
.first()
)
if not product:
assert not \
(
pages_models.Page.objects
.filter(name=self.name, parent__name=self.category.name)
), \
f'Have no product, but have page. Name is {self.name}'
category = (
stb_models.Category.objects
.get(name=self.category.filtered.name)
)
page = self.page.to_model(parent=category.page)
page.related_model_name = 'stroyprombeton_product'
page.type = 'model'
page.save()
category = (
stb_models.Category.objects
.get(name=self.category.filtered.name)
)
product = (
stb_models.Product.objects
.create(name=self.name, page=page, category=category)
)
self.options.save(product)
return product
class Products(ListDataType):
def __init__(self, products: typing.List[DataItem]):
# products in json has been sorted
key = lambda p: (p.name, p.category.filtered.name)
items = [Product(p) for p in products]
self.items: typing.List[Product] = [
Product.from_group(products)
for _, products in groupby(sorted(items, key=key), key=key)
]
def options_count(self):
return sum(p.options.count() for p in self.items)
class Categories(ListDataType):
def __init__(self, categories: typing.List[DataItem]):
items = [Category(c) for c in categories]
self.items: typing.List[Category] = sorted(items, key=lambda c: (c.parent.id or 0, c.id))
@lru_cache(maxsize=1)
def get_roots(self) -> typing.List[Category]:
return [c for c in self.items if not c.parent.id]
def count(self):
return len([i for i in self.items if 'Серия' not in i.name])
def __iter__(self) -> typing.Iterator[Category]:
for i in self.items:
if 'Серия' not in i.name:
yield i
def save(self):
super().save()
ids = stb_models.Category.objects.values_list('id', flat=True)
Redirect.objects.bulk_create([
Redirect(
old_path=category.url,
new_path=(
stb_models.Category.objects
.get(name=category.filtered.name)
).url,
site=Site.objects.first(),
) for category in self.items if category.id not in ids
])
@contextmanager
def non_persistence():
with transaction.atomic():
point = transaction.savepoint()
yield
transaction.savepoint_rollback(point)
@contextmanager
def log_count(klass):
before = klass.objects.count()
yield
after = klass.objects.count()
log(f'{klass.__name__.lower()}s {before} -> {after}')
log('--- parse json ---')
with open('categories.json', 'r') as file:
categories = Categories(json.loads(file.read()))
with open('products.json', 'r') as file:
products = Products(json.loads(file.read()))
category_ids = [c.id for c in categories]
assert all('Серия' not in p.category.filtered.name for p in products)
assert all(p.category.filtered.id in category_ids for p in products)
product_ids = [p.id for p in products]
assert all(
o.product in product_ids
for p in products for o in p.options
)
log('--- save data to the DB ---')
with non_persistence(), log_count(Redirect):
with log_count(stb_models.Category):
categories.save()
stb_models.TagGroup.objects.create(name='Рабочая документация')
with \
log_count(stb_models.Tag), \
log_count(stb_models.Option), \
log_count(stb_models.Product):
products.save()
log('--- finished ---')
@duker33
Copy link
Author

duker33 commented Jun 3, 2019

appended options to grouped products.
Output from the local app:

('categories count', 554)
('products count', 533)
('options count', 14381)

@duker33
Copy link
Author

duker33 commented Jun 3, 2019

explored fields merging: it's not required.

Started to save fetched models to the DB

@duker33
Copy link
Author

duker33 commented Jun 4, 2019

saved categories and products to DB.
Options, tags and Series left

@duker33
Copy link
Author

duker33 commented Jun 5, 2019

  • save options to the DB
  • create tags from option's fields
  • save tags to the DB

@duker33
Copy link
Author

duker33 commented Jun 5, 2019

We have this output

categories 191 | 554 -> 554
products 710 | 533 -> 768
options 12197 -> 13611
tags 4073 -> 5266

products 710 | 533 -> 768 means that

  • there was 710 products at the current production DB
  • 533 products was at the old DB
  • 768 products the new production DB has

@duker33
Copy link
Author

duker33 commented Jun 6, 2019

  • group products by (category, name) instead of name
  • exclude series categories

@duker33
Copy link
Author

duker33 commented Jun 6, 2019

exclude series categories before products grouping

@duker33
Copy link
Author

duker33 commented Jun 7, 2019

  • save only non-filtered categories
  • fix products filtering

Output with models count:

categories 191 |  193 -> 196
products 710 | 818 -> 818
options 12197 -> 14132
tags 4073 -> 5699

@duker33
Copy link
Author

duker33 commented Jun 7, 2019

  • created redirects for categories
  • created redirects from old products to new options

@duker33
Copy link
Author

duker33 commented Jun 7, 2019

categorys 191 -> 196
products 710 -> 818
options 12197 -> 14132
tags 4073 -> 5699
redirects 197 -> 2495

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment