Skip to content

Instantly share code, notes, and snippets.

@Mifody
Last active February 5, 2021 11:39
Show Gist options
  • Save Mifody/d83f504c3ed4eb06ee102a77aabd3b24 to your computer and use it in GitHub Desktop.
Save Mifody/d83f504c3ed4eb06ee102a77aabd3b24 to your computer and use it in GitHub Desktop.
scrapy запись в базу чанками
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from rssparser.db_utils import *
from rssparser.items import *
import logging
import json
class FeedlyPipeline:
def __init__(self):
self.items = []
super(FeedlyPipeline, self).__init__()
def process_item(self, item, spider):
if isinstance(item, FlushItem):
self.flush_data()
return
self.items.append(item)
if len(self.items) > 1000:
self.flush_data()
return item
def close_spider(self, spider):
if len(self.items) > 0:
self.flush_data()
def flush_data(self):
with db_handle.atomic() as transaction:
# try:
for item in self.items:
if isinstance(item, FeedlyTagItem):
if 'id' in item and FeedlyTagsModel.select().where(FeedlyTagsModel.id == item['id']).exists():
FeedlyTagsModel.update({FeedlyTagsModel.status: item['status']})\
.where(FeedlyTagsModel.id == item['id']).execute()
elif not FeedlyTagsModel.select().where(FeedlyTagsModel.url == item['url']).exists():
FeedlyTagsModel.insert(**dict(item)).execute()
elif isinstance(item, FeedlyDataItem):
query = FeedlyDataModel.select().where(FeedlyDataModel.rss_url == item['rss_url'])
if not query.exists():
try:
id = FeedlyDataModel.insert(**dict(item)).execute()
except:
logging.error(f'Error insert item [tags]: {json.dumps(dict(item))}')
transaction.commit()
self.items = []
class RssparserPipeline(object):
def __init__(self):
self.items = []
super(RssparserPipeline, self).__init__()
def process_item(self, item, spider):
self.items.append(dict(item))
if len(self.items) > 1000:
self.flush_data()
return item
def close_spider(self, spider):
if len(self.items) > 0:
self.flush_data()
def flush_data(self):
with db_handle.atomic() as transaction:
# try:
for item in self.items:
rec = {
'status': item['status'],
'data': item
}
# CatalogModel.update(**rec).where(CatalogModel.id == item['id']).execute()
transaction.commit()
self.items = []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment