Skip to content

Instantly share code, notes, and snippets.

@thelinuxlich
Created June 19, 2020 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thelinuxlich/e5162819c2d05aaf77e1a4024a425ab9 to your computer and use it in GitHub Desktop.
Save thelinuxlich/e5162819c2d05aaf77e1a4024a425ab9 to your computer and use it in GitHub Desktop.
Crystal updated scraper
require "json"
require "pg"
require "amqp-client"
require "logger"
require "openssl"
require "uri"
require "pool"
log = Logger.new(STDOUT)
log.level = Logger::DEBUG
STDOUT.sync = true
struct Seller
include JSON::Serializable
getter nickname : String
getter permalink : String
getter seller_reputation : SellerReputation
end
struct SellerReputation
include JSON::Serializable
getter transactions : Transactions
end
struct Transactions
include JSON::Serializable
getter completed : Int32
getter canceled : Int32
end
struct Job
include JSON::Serializable
getter key : String
getter permalink : String
end
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36"
headers = HTTP::Headers{"User-Agent" => user_agent}
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}"
sleep 10.seconds
p "Iniciando serviço de captura de preços em anúncios..."
channel = Channel(String).new
WORKERS = 100
INITIAL = 50
HTTP_TIMEOUT = 60.0
pg_pool = Pool.new(capacity: WORKERS + 100, initial: INITIAL, timeout: HTTP_TIMEOUT) do
db = PG.connect PG_URL
[db.prepared("INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key)
DO UPDATE SET
name = EXCLUDED.name,
permalink = EXCLUDED.PERMALINK,
transactions_completed = EXCLUDED.transactions_completed,
transactions_cancelled = EXCLUDED.transactions_cancelled,
updated_at = CURRENT_TIMESTAMP"),
db.prepared("UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1"),
db.prepared("INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING")]
end
tls = OpenSSL::SSL::Context::Client.insecure
count = 0
api_mercadolibre_pool = Pool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new "api.mercadolibre.com", tls: tls }
http_pools = ["produto.mercadolivre.com.br", "caminhao.mercadolivre.com.br", "carro.mercadolivre.com.br",
"construcao-manutencao.mercadolivre.com.br", "festa-evento.mercadolivre.com.br", "moto.mercadolivre.com.br",
"servico.mercadolivre.com.br", "suporte-tecnico.mercadolivre.com.br", "transporte-veiculo.mercadolivre.com.br",
"veiculo.mercadolivre.com.br", "www.mercadolivre.com.br"].to_h do |i|
{i, Pool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new i, tls: tls }}
end
metrics_regex = /\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/
alternative_sold_count_regex = /<div class="item-conditions">(.*?)<\/div>/
alternative_price_regex = /\{"type":"price","value":(.*?),/
thumb_regex = /a href="(.*?)" class="gallery-trigger/
all_but_numbers = "^0-9"
date_format = "%F"
ok = "1"
captures = ["sellerId", "startTime", "localItemPrice", "itemId", "availableStock", "soldStock"]
items_cache = Hash(String, Int32).new
AMQP::Client.start("amqp://#{ENV["RABBITMQ_USER"]}:#{ENV["RABBITMQ_PASSWORD"]}@#{ENV["RABBITMQ_HOST"]}:#{ENV["RABBITMQ_PORT"].to_i}") do |c|
c.channel do |ch|
queue = ch.queue(ENV["RABBITMQ_QUEUE"])
WORKERS.times do
spawn do
loop do
payload = channel.receive
job = Job.from_json payload
count += 1
p "[#{count}] Item #{job.key} passando pela atualização de preços..."
if items_cache[job.key]?
p "Item #{job.key} já passou pela atualização de preços..."
else
begin
data = nil
http_key = nil
http_pools.each_key do |k|
http_key = k if job.permalink.index(k)
end
if !http_key
next
else
client = http_pools[http_key].checkout
data = client.get(URI.parse(job.permalink).path, headers: headers)
http_pools[http_key].checkin client
item = data.body
_start_time = nil
seller_id = nil
_price = nil
alternate_item_id = nil
_available_quantity = nil
_sold_count = nil
item.scan(metrics_regex) do |n|
case n[1]
when captures[0]
seller_id = n[2]
when captures[1]
_start_time = n[2]
when captures[2]
_price = n[2]
when captures[3]
alternate_item_id = n[2]
when captures[4]
_available_quantity = n[2]
when captures[5]
_sold_count = n[2]
end
end
_sold_count ||= item.match(alternative_sold_count_regex).try(&.[1])
_thumb = item.match thumb_regex
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339
price = 0_f32
available_quantity = 0
sold_count = 0
if (_price && _price.to_f?)
price = _price.to_f
else
_price = item.match(alternative_price_regex).try(&.[1])
if _price
price = _price.to_f
else
p "price errado em #{job.permalink}"
end
end
if _available_quantity
__available_quantity = _available_quantity.delete(all_but_numbers)
if __available_quantity.to_i?
available_quantity = __available_quantity.to_i
else
p "available_quantity errado em #{job.permalink}"
end
end
if _sold_count
__sold_count = _sold_count.delete(all_but_numbers)
if (__sold_count.to_i?)
sold_count = __sold_count.to_i
else
p "sold_count errado em #{job.permalink}"
end
end
thumb = nil
seller = nil
client = api_mercadolibre_pool.checkout
if seller_id
_seller = client.get("/users/#{seller_id}", headers: headers)
if _seller.status_code < 400
seller = Seller.from_json _seller.body
else
p "Não havia seller para id #{seller_id}"
end
else
p "não havia seller_id em #{job.permalink}"
channel.send payload
next
end
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id}
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled}
visits = 0
url_visit = client.get("/visits/items?ids=#{job.key}", headers: headers)
if url_visit.status_code < 400
_visits = JSON.parse url_visit.body
if _visits[job.key].as_i?
visits += _visits[job.key].as_i
end
end
api_mercadolibre_pool.checkin client
client = pg_pool.checkout
if vendor
client[0].exec vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled]
client[1].exec job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id]
end
client[2].exec job.key, sold_count, price, visits, available_quantity, Time.utc.to_s(date_format)
pg_pool.checkin client
end
items_cache[job.key] = 1
rescue ex
log.fatal(ex)
channel.send payload
end
end
end
end
end
queue.subscribe(no_ack: true, block: true) do |_msg|
channel.send(_msg.body_io.to_s)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment