-
-
Save thelinuxlich/c459ca5cd77718307a58a4a3e3c335c5 to your computer and use it in GitHub Desktop.
Adapting a Node.js scraper to Crystal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "json" | |
require "redis" | |
require "halite" | |
require "pg" | |
require "logger" | |
require "amqp-client" | |
require "logger" | |
require "openssl" | |
log = Logger.new(STDOUT) | |
log.level = Logger::DEBUG | |
STDOUT.sync = true | |
struct Seller | |
include JSON::Serializable | |
getter nickname : String | |
getter permalink : String | |
getter seller_reputation : SellerReputation | |
end | |
struct SellerReputation | |
include JSON::Serializable | |
getter transactions : Transactions | |
end | |
struct Transactions | |
include JSON::Serializable | |
getter completed : Int32 | |
getter canceled : Int32 | |
end | |
struct Job | |
include JSON::Serializable | |
getter key : String | |
getter permalink : String | |
end | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36" | |
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}" | |
sleep 10.seconds | |
db = PG.connect PG_URL | |
redis = Redis.new host: ENV["REDISHOST"], port: ENV["REDISPORT"].to_i, password: ENV["REDISPASSWORD"] | |
channel = Channel(String).new(1_000_000) | |
WORKERS = 1000 | |
HTTP_TIMEOUT = 30 | |
tls = OpenSSL::SSL::Context::Client.insecure | |
count = 0 | |
AMQP::Client.start("amqp://#{ENV["RABBITMQ_USER"]}:#{ENV["RABBITMQ_PASSWORD"]}@#{ENV["RABBITMQ_HOST"]}:#{ENV["RABBITMQ_PORT"].to_i}") do |c| | |
c.channel do |ch| | |
queue = ch.queue(ENV["RABBITMQ_QUEUE"]) | |
WORKERS.times do | |
spawn do | |
loop do | |
payload = channel.receive | |
job = Job.from_json payload | |
count += 1 | |
job_cache = redis.get "price_item:#{job.key}" | |
if job_cache == "1" | |
p "Item #{job.key} already exists..." | |
else | |
begin | |
data = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.get(job.permalink, tls: tls) | |
item = data.body | |
captures = item.scan(/\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/).map { |n| [n[1], n[2]] }.to_h | |
seller_id = captures["sellerId"]? | |
_start_time = captures["startTime"]? | |
_price = captures["localItemPrice"]? | |
alternate_item_id = captures["itemId"]? | |
_available_quantity = captures["availableStock"]? | |
_sold_count = captures["soldStock"]? || | |
item.match(/<div class="item-conditions">(.*?)<\/div>/).try(&.[1]) | |
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339 | |
price = 0_f32 | |
available_quantity = 0 | |
sold_count = 0 | |
if (_price && _price.to_f?) | |
price = _price.to_f | |
else | |
_price = item.match(/\{"type":"price","value":(.*?),/).try(&.[1]) | |
if _price | |
price = _price.to_f | |
end | |
end | |
if _available_quantity | |
__available_quantity = _available_quantity.gsub(/[^0-9]+/, "") | |
if __available_quantity.to_i? | |
available_quantity = __available_quantity.to_i | |
end | |
end | |
if _sold_count | |
__sold_count = _sold_count.gsub(/[^0-9]+/, "") | |
if (__sold_count.to_i?) | |
sold_count = __sold_count.to_i | |
end | |
end | |
thumb = nil | |
seller = nil | |
if seller_id | |
_seller = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.use("cache").get("https://api.mercadolibre.com/users/#{seller_id}", tls: tls) | |
if _seller.status_code < 400 | |
seller = Seller.from_json _seller.body | |
end | |
else | |
raise "no seller_id in #{job.permalink}" | |
end | |
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id} | |
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled} | |
visits = 0 | |
url_visit = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.use("cache").get("https://api.mercadolibre.com/visits/items?ids=#{job.key}", tls: tls) | |
if url_visit.status_code < 400 | |
_visits = JSON.parse url_visit.body | |
if _visits[job.key].as_i? | |
visits += _visits[job.key].as_i | |
end | |
end | |
if vendor | |
db.exec "INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key) | |
DO UPDATE SET | |
name = EXCLUDED.name, | |
permalink = EXCLUDED.PERMALINK, | |
transactions_completed = EXCLUDED.transactions_completed, | |
transactions_cancelled = EXCLUDED.transactions_cancelled, | |
updated_at = CURRENT_TIMESTAMP", vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled] | |
db.exec "UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1", job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id] | |
end | |
db.exec "INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING", job.key, sold_count, price, visits, available_quantity, Time.utc.to_s("%F") | |
redis.set "price_item:#{job.key}", "1", 86400 | |
rescue ex | |
log.fatal(ex) | |
channel.send payload | |
end | |
end | |
end | |
end | |
end | |
queue.subscribe(no_ack: true, block: true) do |_msg| | |
channel.send(_msg.body_io.to_s) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment