Skip to content

Instantly share code, notes, and snippets.

@thelinuxlich
Created June 19, 2020 02:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save thelinuxlich/c459ca5cd77718307a58a4a3e3c335c5 to your computer and use it in GitHub Desktop.
Save thelinuxlich/c459ca5cd77718307a58a4a3e3c335c5 to your computer and use it in GitHub Desktop.
Adapting a Node.js scraper to Crystal
require "json"
require "redis"
require "halite"
require "pg"
require "logger"
require "amqp-client"
require "logger"
require "openssl"
log = Logger.new(STDOUT)
log.level = Logger::DEBUG
STDOUT.sync = true
struct Seller
include JSON::Serializable
getter nickname : String
getter permalink : String
getter seller_reputation : SellerReputation
end
struct SellerReputation
include JSON::Serializable
getter transactions : Transactions
end
struct Transactions
include JSON::Serializable
getter completed : Int32
getter canceled : Int32
end
struct Job
include JSON::Serializable
getter key : String
getter permalink : String
end
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36"
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}"
sleep 10.seconds
db = PG.connect PG_URL
redis = Redis.new host: ENV["REDISHOST"], port: ENV["REDISPORT"].to_i, password: ENV["REDISPASSWORD"]
channel = Channel(String).new(1_000_000)
WORKERS = 1000
HTTP_TIMEOUT = 30
tls = OpenSSL::SSL::Context::Client.insecure
count = 0
AMQP::Client.start("amqp://#{ENV["RABBITMQ_USER"]}:#{ENV["RABBITMQ_PASSWORD"]}@#{ENV["RABBITMQ_HOST"]}:#{ENV["RABBITMQ_PORT"].to_i}") do |c|
c.channel do |ch|
queue = ch.queue(ENV["RABBITMQ_QUEUE"])
WORKERS.times do
spawn do
loop do
payload = channel.receive
job = Job.from_json payload
count += 1
job_cache = redis.get "price_item:#{job.key}"
if job_cache == "1"
p "Item #{job.key} already exists..."
else
begin
data = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.get(job.permalink, tls: tls)
item = data.body
captures = item.scan(/\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/).map { |n| [n[1], n[2]] }.to_h
seller_id = captures["sellerId"]?
_start_time = captures["startTime"]?
_price = captures["localItemPrice"]?
alternate_item_id = captures["itemId"]?
_available_quantity = captures["availableStock"]?
_sold_count = captures["soldStock"]? ||
item.match(/<div class="item-conditions">(.*?)<\/div>/).try(&.[1])
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339
price = 0_f32
available_quantity = 0
sold_count = 0
if (_price && _price.to_f?)
price = _price.to_f
else
_price = item.match(/\{"type":"price","value":(.*?),/).try(&.[1])
if _price
price = _price.to_f
end
end
if _available_quantity
__available_quantity = _available_quantity.gsub(/[^0-9]+/, "")
if __available_quantity.to_i?
available_quantity = __available_quantity.to_i
end
end
if _sold_count
__sold_count = _sold_count.gsub(/[^0-9]+/, "")
if (__sold_count.to_i?)
sold_count = __sold_count.to_i
end
end
thumb = nil
seller = nil
if seller_id
_seller = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.use("cache").get("https://api.mercadolibre.com/users/#{seller_id}", tls: tls)
if _seller.status_code < 400
seller = Seller.from_json _seller.body
end
else
raise "no seller_id in #{job.permalink}"
end
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id}
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled}
visits = 0
url_visit = Halite.timeout(HTTP_TIMEOUT).user_agent(user_agent).follow.use("cache").get("https://api.mercadolibre.com/visits/items?ids=#{job.key}", tls: tls)
if url_visit.status_code < 400
_visits = JSON.parse url_visit.body
if _visits[job.key].as_i?
visits += _visits[job.key].as_i
end
end
if vendor
db.exec "INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key)
DO UPDATE SET
name = EXCLUDED.name,
permalink = EXCLUDED.PERMALINK,
transactions_completed = EXCLUDED.transactions_completed,
transactions_cancelled = EXCLUDED.transactions_cancelled,
updated_at = CURRENT_TIMESTAMP", vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled]
db.exec "UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1", job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id]
end
db.exec "INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING", job.key, sold_count, price, visits, available_quantity, Time.utc.to_s("%F")
redis.set "price_item:#{job.key}", "1", 86400
rescue ex
log.fatal(ex)
channel.send payload
end
end
end
end
end
queue.subscribe(no_ack: true, block: true) do |_msg|
channel.send(_msg.body_io.to_s)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment