Last active
February 5, 2021 11:39
-
-
Save Mifody/d83f504c3ed4eb06ee102a77aabd3b24 to your computer and use it in GitHub Desktop.
scrapy запись в базу чанками
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Define your item pipelines here | |
# | |
# Don't forget to add your pipeline to the ITEM_PIPELINES setting | |
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html | |
from rssparser.db_utils import * | |
from rssparser.items import * | |
import logging | |
import json | |
class FeedlyPipeline: | |
def __init__(self): | |
self.items = [] | |
super(FeedlyPipeline, self).__init__() | |
def process_item(self, item, spider): | |
if isinstance(item, FlushItem): | |
self.flush_data() | |
return | |
self.items.append(item) | |
if len(self.items) > 1000: | |
self.flush_data() | |
return item | |
def close_spider(self, spider): | |
if len(self.items) > 0: | |
self.flush_data() | |
def flush_data(self): | |
with db_handle.atomic() as transaction: | |
# try: | |
for item in self.items: | |
if isinstance(item, FeedlyTagItem): | |
if 'id' in item and FeedlyTagsModel.select().where(FeedlyTagsModel.id == item['id']).exists(): | |
FeedlyTagsModel.update({FeedlyTagsModel.status: item['status']})\ | |
.where(FeedlyTagsModel.id == item['id']).execute() | |
elif not FeedlyTagsModel.select().where(FeedlyTagsModel.url == item['url']).exists(): | |
FeedlyTagsModel.insert(**dict(item)).execute() | |
elif isinstance(item, FeedlyDataItem): | |
query = FeedlyDataModel.select().where(FeedlyDataModel.rss_url == item['rss_url']) | |
if not query.exists(): | |
try: | |
id = FeedlyDataModel.insert(**dict(item)).execute() | |
except: | |
logging.error(f'Error insert item [tags]: {json.dumps(dict(item))}') | |
transaction.commit() | |
self.items = [] | |
class RssparserPipeline(object): | |
def __init__(self): | |
self.items = [] | |
super(RssparserPipeline, self).__init__() | |
def process_item(self, item, spider): | |
self.items.append(dict(item)) | |
if len(self.items) > 1000: | |
self.flush_data() | |
return item | |
def close_spider(self, spider): | |
if len(self.items) > 0: | |
self.flush_data() | |
def flush_data(self): | |
with db_handle.atomic() as transaction: | |
# try: | |
for item in self.items: | |
rec = { | |
'status': item['status'], | |
'data': item | |
} | |
# CatalogModel.update(**rec).where(CatalogModel.id == item['id']).execute() | |
transaction.commit() | |
self.items = [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment