Skip to content

Instantly share code, notes, and snippets.

@ngshiheng
Last active October 11, 2022 04:34
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ngshiheng/453141d2eb5e2d2cacdbca56568ee71e to your computer and use it in GitHub Desktop.
Save ngshiheng/453141d2eb5e2d2cacdbca56568ee71e to your computer and use it in GitHub Desktop.
An example pipeline that uses bulk insert while saving scrapped item into database using SQLAlchemy.
import logging
from scrapy import Spider
from sqlalchemy.orm import sessionmaker
from example.items import ProductItem
from example.models import Price, Product, create_table, db_connect
logger = logging.getLogger(__name__)
class ExampleScrapyPipeline:
"""
An example pipeline that saves new products and their prices into database
"""
def __init__(self):
"""
Initializes database connection and sessionmaker
"""
engine = db_connect()
create_table(engine)
self.Session = sessionmaker(bind=engine)
self.products = []
self.prices = []
def process_item(self, item: ProductItem, spider: Spider) -> ProductItem:
"""
This method is called for every item pipeline component
We save each product as a dict in `self.products` list so that it later be used for bulk saving
"""
product = dict(
name=item['name'],
vendor=item['vendor'],
quantity=item['quantity'],
url=item['url'],
)
self.products.append(product)
self.prices.append(item['price'].amount)
return item
def close_spider(self, spider: Spider) -> None:
"""
Saving all the scraped products and prices in bulk on spider close event
Sadly we currently have to use `return_defaults=True` while bulk saving Product objects which greatly reduces the performance gains of the bulk operation
Though prices do get inserted to the DB in bulk
Reference: https://stackoverflow.com/questions/36386359/sqlalchemy-bulk-insert-with-one-to-one-relation
"""
session = self.Session()
try:
logger.info('Saving products in bulk operation to the database.')
session.bulk_insert_mappings(Product, self.products, return_defaults=True) # Set `return_defaults=True` so that PK (inserted one at a time) value is available for FK usage at another table
logger.info('Saving prices in bulk operation to the database.')
prices = [dict(price=price, product_id=product['id']) for product, price in zip(self.products, self.prices)]
session.bulk_insert_mappings(Price, prices)
session.commit()
except Exception as error:
logger.exception(error, extra=dict(spider=spider))
session.rollback()
raise
finally:
session.close()
import logging
from urllib.parse import urlencode
from scrapy.utils.project import get_project_settings
logger = logging.getLogger(__name__)
settings = get_project_settings()
def get_proxy_url(url: str) -> str:
"""
We send all our requests to https://www.scraperapi.com/ API endpoint in order use their proxy servers
This function converts regular URL to Scaper API's proxy URL
"""
scraper_api_key = settings.get('SCRAPER_API_KEY')
if not scraper_api_key:
logger.warning('Scraper API key not set.', extra=dict(url=url))
return url
proxied_url = 'http://api.scraperapi.com/?' + urlencode({'api_key': scraper_api_key, 'url': url})
logger.info(f'Scraping using Scraper API. URL <{url}>.')
return proxied_url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment