Skip to content

Instantly share code, notes, and snippets.

@artemrys
Last active February 24, 2019 20:44
Show Gist options
  • Save artemrys/b49e26bfffe81fc5f3ce70d9780121b2 to your computer and use it in GitHub Desktop.
Save artemrys/b49e26bfffe81fc5f3ce70d9780121b2 to your computer and use it in GitHub Desktop.
Scrapy Item Pipeline that publishes to RabbitMQ
import json
import pika
from scrapy.utils.serialize import ScrapyJSONEncoder
class RabbitMQItemPublisherPipeline(object):
def __init__(self, host, port, user, password, virtual_host, exchange, routing_key, queue):
self.host = host
self.port = port
self.user = user
self.password = password
self.virtual_host = virtual_host
credentials = pika.PlainCredentials(self.user, self.password)
parameters = pika.ConnectionParameters(self.host,
self.port,
self.virtual_host,
credentials)
# Connecting to RabbitMQ
self.connection = pika.BlockingConnection(parameters=parameters)
self.channel = self.connection.channel()
self.exchange = exchange
self.routing_key = routing_key
self.queue = queue
# Declaring RabbitMQ exchange
self.channel.exchange_declare(exchange=exchange,
exchange_type="direct",
durable=True)
# Decaling RabbitMQ queue
self.channel.queue_declare(queue=queue,
durable=True)
# Binding exchange + routing_key = queue
self.channel.queue_bind(exchange=exchange,
routing_key=routing_key,
queue=queue)
self.encoder = ScrapyJSONEncoder()
@classmethod
def from_crawler(cls, crawler):
# Creating a RabbitMQItemPublisherPipeline
return cls(
host=crawler.settings.get("RABBITMQ_HOST"),
port=crawler.settings.get("RABBITMQ_PORT"),
user=crawler.settings.get("RABBITMQ_USER"),
password=crawler.settings.get("RABBITMQ_PASSWORD"),
virtual_host=crawler.settings.get("RABBITMQ_VIRTUAL_HOST"),
exchange=crawler.settings.get("RABBITMQ_EXCHANGE"),
routing_key=crawler.settings.get("RABBITMQ_ROUTING_KEY"),
queue=crawler.settings.get("RABBITMQ_QUEUE"),
)
def close_spider(self, spider):
# Closing RabbitMQ channel and connection
self.channel.close()
self.connection.close()
def process_item(self, item, spider):
# Encoding item dict using Scrapy JSON Encoder
data = self.encoder.encode(item)
# Publishing item to exchange + routing_key = queue
self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=data,
)
# Returning item to be processed
return item
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment