Last active
June 7, 2019 11:53
-
-
Save duker33/680082d893aba91995ad9082832e8ffe to your computer and use it in GitHub Desktop.
Import the old STB data to the actual DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from functools import lru_cache | |
import json | |
import logging | |
import random | |
import string | |
import typing | |
from itertools import groupby | |
from django.db import models, transaction | |
from django.contrib.redirects.models import Redirect | |
from django.contrib.sites.models import Site | |
from django.shortcuts import reverse | |
from pages import models as pages_models | |
from stroyprombeton import models as stb_models | |
def randomize_slug(slug: str, hash_size: int) -> str: | |
hash_size = hash_size | |
slug_hash = ''.join( | |
random.choices(string.ascii_lowercase, k=hash_size) | |
) | |
return f'{slug}_{slug_hash}' | |
logger = logging.getLogger(__name__) | |
def log(*args): | |
# logger.info('\n'.join(str(args))) | |
print(*args) | |
# DataItem as it presented at json files | |
DataItem = typing.Dict[ | |
str, | |
typing.Union[ | |
str, int, 'DataItem', | |
typing.List['DataItem'] | |
], | |
] | |
class DataType: | |
PLAIN = [] | |
RELATED = [] | |
def __init__(self, item: DataItem): | |
for field in self.PLAIN: | |
setattr(self, field, item[field]) | |
def to_data(self) -> dict: | |
return { | |
**{f: getattr(self, f) for f in self.PLAIN}, | |
**{f: getattr(self, f).to_data() for f in self.RELATED} | |
} | |
class ListDataType: | |
items: typing.List[DataType] = None | |
def to_data(self) -> list: | |
return [i.to_data() for i in self.items] | |
def count(self): | |
return len(self.items) | |
def __iter__(self): | |
for i in self.items: | |
yield i | |
def save(self) -> typing.List[models.Model]: | |
saved = [item.save() for item in self] | |
return list(filter(None, saved)) | |
class Page(DataType): | |
PLAIN = [ | |
'h1', 'slug', 'content', 'position', | |
'description', 'keywords', 'seo_text', | |
] | |
h1, slug, content, position, description, keywords, seo_text = 7 * (None, ) | |
def to_model(self, parent: pages_models.ModelPage) -> pages_models.ModelPage: | |
return ( | |
pages_models.ModelPage.objects | |
.create( | |
parent=parent, | |
**{f: getattr(self, f) for f in self.PLAIN} | |
) | |
) | |
class TagGroup(DataType): | |
MAP = dict( | |
length='Длина', width='Ширина', height='Высота', | |
weight='Масса', volume='Объём', | |
diameter_out='Внешний диаметр', diameter_in='Внутренний диаметр', | |
specification='Рабочая документация', | |
) | |
PLAIN = ['name'] | |
def __init__(self, field: str): | |
self.name = self.MAP[field] | |
def fetch(self) -> stb_models.TagGroup: | |
return stb_models.TagGroup.objects.get(name=self.name) | |
class Tag(DataType): | |
PLAIN = ['id', 'name', 'position', 'slug'] | |
RELATED = ['group'] | |
DIMENSIONS = dict( | |
length='Длина', width='Ширина', height='Высота', | |
weight='Масса', volume='Объём', | |
diameter_out='Внешний диаметр', diameter_in='Внутренний диаметр', | |
specification='Рабочая документация', | |
) | |
def __init__(self, field: str, value: str): | |
self.name = str(value) | |
self.group = TagGroup(field) | |
self.field = field | |
def save(self, option: stb_models.Option) -> stb_models.Tag: | |
try: | |
tag, _ = stb_models.Tag.objects.get_or_create( | |
name=f'{self.name} {self.DIMENSIONS[self.field]}', | |
group=self.group.fetch(), | |
) | |
except Exception as e: | |
tag = stb_models.Tag(name=self.name, group=self.group.fetch()) | |
original = stb_models.Tag.objects.filter(slug=tag._get_slug()).first() | |
log(f'original {original.group.name}, {original.name}, {original.slug}') | |
log(f'trying {self.group.name}, {self.name}') | |
raise e | |
tag.options.add(option) | |
tag.save() | |
return tag | |
class Tags(ListDataType): | |
PLAIN = [ | |
'specification', | |
'length', 'width', 'height', | |
'weight', 'volume', | |
'diameter_out', 'diameter_in' | |
] | |
def __init__(self, option: DataItem): | |
self.items: typing.List[Tag] = [Tag(field=f, value=option[f]) for f in self.PLAIN] | |
def save(self, option: stb_models.Option): | |
for tag in self.items: | |
tag.save(option) | |
class Option(DataType): | |
TAGS = [ | |
'specification', | |
'length', 'width', 'height', | |
'weight', 'volume', | |
'diameter_out', 'diameter_in' | |
] | |
OPTION = ['mark', 'price'] | |
PLAIN = [*OPTION, *TAGS,] | |
RELATED = ['product', 'old_product', 'series', 'tags'] | |
code, mark, price = 3 * (None, ) | |
product, old_product, series, tags = 4 * (None, ) | |
def __init__(self, option: DataItem): | |
super().__init__(option) | |
assert isinstance(option['old_product'], int) | |
assert isinstance(option['product'], int) | |
self.old_product: int = option['old_product'] | |
self.product: int = option['product'] | |
self.tags = Tags(option) | |
@classmethod | |
def from_product(cls, product: 'Product', head: 'Product'): | |
""" | |
:param head: The main product at the group. | |
""" | |
return cls({ | |
**{'product': head.id, 'old_product': product.id}, | |
**{f: getattr(product, f) for f in cls.PLAIN} | |
}) | |
@property | |
def old_url(self) -> str: | |
return reverse('product', args=(self.old_product, )) | |
@lru_cache(maxsize=1) | |
def to_model(self, product: stb_models.Product) -> stb_models.Option: | |
option = stb_models.Option( | |
product=product, | |
**{f: getattr(self, f) for f in self.OPTION} | |
) | |
# patch because `Option.url` is not really implemented | |
option.url_ = product.url + f'?option_id={option.id}' | |
return option | |
class Options(ListDataType): | |
def __init__(self, options: typing.List[DataItem]): | |
self.items: typing.List[Option] = [Option(o) for o in options] | |
@classmethod | |
def from_options(cls, options: typing.List[Option]): | |
result = cls([]) | |
result.items = options | |
return result | |
# product is related to the current options set | |
def save(self, product: stb_models.Product): | |
options = stb_models.Option.objects.bulk_create([ | |
o.to_model(product) for o in self.items | |
]) | |
for data, model in zip(self.items, options): | |
assert data.code == model.code | |
assert data.price == model.price | |
assert data.mark == model.mark | |
data.tags.save(model) | |
Redirect.objects.bulk_create([ | |
Redirect( | |
old_path=option.old_url, | |
new_path=option.to_model(product).url_, | |
site=Site.objects.first(), | |
) for option in self.items | |
]) | |
class Parent(DataType): | |
PLAIN = ['id', 'name'] | |
id, name = None, None | |
class Category(DataType): | |
PLAIN = ['id', 'name', 'specification'] | |
RELATED = ['page', 'parent'] | |
id, name, specificaton = 3 * (None, ) | |
page, parent = 2 * (None, ) | |
def __init__(self, category: DataItem): | |
super().__init__(category) | |
self.page = Page(category['page']) | |
self.parent = Parent(category['parent']) | |
@staticmethod | |
def from_parent(parent: Parent) -> 'Category': | |
similar = [c for c in categories.items if c.id == parent.id] | |
assert len(similar) == 1, [s.name for s in similar] | |
return similar[0] | |
def is_second_level(self): | |
roots = [c.id for c in categories.get_roots()] | |
return self.parent.id in roots | |
def is_root(self): | |
roots = [c.id for c in categories.get_roots()] | |
return self.id in roots | |
@property | |
def url(self) -> str: | |
return reverse('category', args=(self.id,)) | |
@property | |
@lru_cache(maxsize=1) | |
def filtered(self): | |
category = self | |
while 'Серия' in category.name: | |
category = Category.from_parent(category.parent) | |
return category | |
def save(self) -> typing.Union[stb_models.Category, None]: | |
if self.is_root(): | |
return | |
category = stb_models.Category.objects.filter(name=self.name).first() | |
if category: | |
assert category.page.slug == self.page.slug | |
else: | |
assert not pages_models.Page.objects.filter(name=self.name), \ | |
f'Have no category, but have page. Name is {self.name}' | |
if not self.is_second_level(): | |
parent = stb_models.Category.objects.filter(name=self.parent.name).first() | |
if not parent: | |
parent = Category.from_parent(self.parent).save() | |
assert parent, self.parent.name | |
else: | |
parent = None | |
page = self.page.to_model(parent.page if parent else None) | |
page.related_model_name = 'stroyprombeton_category' | |
page.type = 'model' | |
page.save() | |
category = ( | |
stb_models.Category.objects | |
.create(name=self.name, page=page, parent=parent) | |
) | |
return category | |
class Product(DataType): | |
PLAIN = [ | |
'id', 'name', | |
'price', 'mark', | |
# tags content | |
'specification', | |
'length', 'width', 'height', | |
'weight', 'volume', | |
'diameter_out', 'diameter_in' | |
] | |
RELATED = ['page', 'category', 'tags'] | |
id, name, price, mark, specification = 5 * (None, ) | |
page, category, tags = 3 * (None, ) | |
options = None | |
def __init__(self, product: DataItem): | |
super().__init__(product) | |
self.page = Page(product['page']) | |
self.category = Category(product['category']) | |
# self.options: Options = Options( | |
# # option fields set is subset the product fields. | |
# # So, we pass old product data as option data | |
# [{**p, 'product': product['id']} for p in product['group']] | |
# ) | |
@property | |
def url(self) -> str: | |
return reverse('product', args=(self.id,)) | |
@classmethod | |
def from_group(cls, group: typing.Iterable['Product']) -> 'Product': | |
group = list(group) | |
product = group[0] | |
product.options = Options.from_options( | |
[Option.from_product(p, product) for p in group] | |
) | |
assert all(isinstance(o.product, int) for o in product.options) | |
assert all(o.product == product.id for o in product.options) | |
return product | |
def save(self) -> stb_models.Product: | |
product = ( | |
stb_models.Product.objects | |
.filter(name=self.name, category__name=self.category.filtered.name) | |
.first() | |
) | |
if not product: | |
assert not \ | |
( | |
pages_models.Page.objects | |
.filter(name=self.name, parent__name=self.category.name) | |
), \ | |
f'Have no product, but have page. Name is {self.name}' | |
category = ( | |
stb_models.Category.objects | |
.get(name=self.category.filtered.name) | |
) | |
page = self.page.to_model(parent=category.page) | |
page.related_model_name = 'stroyprombeton_product' | |
page.type = 'model' | |
page.save() | |
category = ( | |
stb_models.Category.objects | |
.get(name=self.category.filtered.name) | |
) | |
product = ( | |
stb_models.Product.objects | |
.create(name=self.name, page=page, category=category) | |
) | |
self.options.save(product) | |
return product | |
class Products(ListDataType): | |
def __init__(self, products: typing.List[DataItem]): | |
# products in json has been sorted | |
key = lambda p: (p.name, p.category.filtered.name) | |
items = [Product(p) for p in products] | |
self.items: typing.List[Product] = [ | |
Product.from_group(products) | |
for _, products in groupby(sorted(items, key=key), key=key) | |
] | |
def options_count(self): | |
return sum(p.options.count() for p in self.items) | |
class Categories(ListDataType): | |
def __init__(self, categories: typing.List[DataItem]): | |
items = [Category(c) for c in categories] | |
self.items: typing.List[Category] = sorted(items, key=lambda c: (c.parent.id or 0, c.id)) | |
@lru_cache(maxsize=1) | |
def get_roots(self) -> typing.List[Category]: | |
return [c for c in self.items if not c.parent.id] | |
def count(self): | |
return len([i for i in self.items if 'Серия' not in i.name]) | |
def __iter__(self) -> typing.Iterator[Category]: | |
for i in self.items: | |
if 'Серия' not in i.name: | |
yield i | |
def save(self): | |
super().save() | |
ids = stb_models.Category.objects.values_list('id', flat=True) | |
Redirect.objects.bulk_create([ | |
Redirect( | |
old_path=category.url, | |
new_path=( | |
stb_models.Category.objects | |
.get(name=category.filtered.name) | |
).url, | |
site=Site.objects.first(), | |
) for category in self.items if category.id not in ids | |
]) | |
@contextmanager | |
def non_persistence(): | |
with transaction.atomic(): | |
point = transaction.savepoint() | |
yield | |
transaction.savepoint_rollback(point) | |
@contextmanager | |
def log_count(klass): | |
before = klass.objects.count() | |
yield | |
after = klass.objects.count() | |
log(f'{klass.__name__.lower()}s {before} -> {after}') | |
log('--- parse json ---') | |
with open('categories.json', 'r') as file: | |
categories = Categories(json.loads(file.read())) | |
with open('products.json', 'r') as file: | |
products = Products(json.loads(file.read())) | |
category_ids = [c.id for c in categories] | |
assert all('Серия' not in p.category.filtered.name for p in products) | |
assert all(p.category.filtered.id in category_ids for p in products) | |
product_ids = [p.id for p in products] | |
assert all( | |
o.product in product_ids | |
for p in products for o in p.options | |
) | |
log('--- save data to the DB ---') | |
with non_persistence(), log_count(Redirect): | |
with log_count(stb_models.Category): | |
categories.save() | |
stb_models.TagGroup.objects.create(name='Рабочая документация') | |
with \ | |
log_count(stb_models.Tag), \ | |
log_count(stb_models.Option), \ | |
log_count(stb_models.Product): | |
products.save() | |
log('--- finished ---') |
explored fields merging: it's not required.
Started to save fetched models to the DB
saved categories and products to DB.
Options, tags and Series left
- save options to the DB
- create tags from option's fields
- save tags to the DB
We have this output
categories 191 | 554 -> 554
products 710 | 533 -> 768
options 12197 -> 13611
tags 4073 -> 5266
products 710 | 533 -> 768
means that
- there was 710 products at the current production DB
- 533 products was at the old DB
- 768 products the new production DB has
- group products by
(category, name)
instead ofname
- exclude series categories
exclude series categories before products grouping
- save only non-filtered categories
- fix products filtering
Output with models count:
categories 191 | 193 -> 196
products 710 | 818 -> 818
options 12197 -> 14132
tags 4073 -> 5699
- created redirects for categories
- created redirects from old products to new options
categorys 191 -> 196
products 710 -> 818
options 12197 -> 14132
tags 4073 -> 5699
redirects 197 -> 2495
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
appended options to grouped products.
Output from the local app: