Skip to content

Instantly share code, notes, and snippets.

@thelinuxlich
Created June 21, 2020 01:41
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thelinuxlich/7c29b8b3c39913b9966c0ab259053dbb to your computer and use it in GitHub Desktop.
Save thelinuxlich/7c29b8b3c39913b9966c0ab259053dbb to your computer and use it in GitHub Desktop.
Crystal updated scraper(FINAL)
require "json"
require "pg"
require "nats"
require "log"
require "openssl"
require "uri"
require "pool/connection"
struct Seller
include JSON::Serializable
getter nickname : String
getter permalink : String
getter seller_reputation : SellerReputation
end
struct SellerReputation
include JSON::Serializable
getter transactions : Transactions
end
struct Transactions
include JSON::Serializable
getter completed : Int32
getter canceled : Int32
end
struct Job
include JSON::Serializable
getter key : String
getter permalink : String
end
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36"
headers = HTTP::Headers{"User-Agent" => user_agent}
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}"
Log.info { "Iniciando serviço de captura de preços em anúncios..." }
channel = Channel(String).new 10_000_000 # we need a buffered channel because of NATS, otherwise it may drop the connection because of "slow consumer"
WORKERS = 500
INITIAL = 100
HTTP_TIMEOUT = 30.0
vendors_query = "INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key)
DO UPDATE SET
name = EXCLUDED.name,
permalink = EXCLUDED.PERMALINK,
transactions_completed = EXCLUDED.transactions_completed,
transactions_cancelled = EXCLUDED.transactions_cancelled,
updated_at = CURRENT_TIMESTAMP"
items_query = "UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1"
metrics_query = "INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING"
sleep 10.seconds
pg_pool = ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) do
PG.connect PG_URL
end
tls = OpenSSL::SSL::Context::Client.insecure
api_mercadolibre_pool = ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new "api.mercadolibre.com", tls: tls }
http_pools = ["adulto.mercadolivre.com.br", "produto.mercadolivre.com.br", "caminhao.mercadolivre.com.br", "carro.mercadolivre.com.br",
"construcao-manutencao.mercadolivre.com.br", "festa-evento.mercadolivre.com.br", "moto.mercadolivre.com.br",
"servico.mercadolivre.com.br", "suporte-tecnico.mercadolivre.com.br", "transporte-veiculo.mercadolivre.com.br",
"veiculo.mercadolivre.com.br", "www.mercadolivre.com.br"].to_h do |i|
{i, ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new i, tls: tls }}
end
metrics_regex = /\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/
alternative_sold_count_regex = /<div class="item-conditions">(.*?)<\/div>/
alternative_price_regex = /\{"type":"price","value":(.*?),/
thumb_regex = /a href="(.*?)" class="gallery-trigger/
all_but_numbers = "^0-9"
date_format = "%F"
ok = "1"
captures = ["sellerId", "startTime", "localItemPrice", "itemId", "availableStock", "soldStock"]
items_cache = Hash(String, Int32).new
sellers_cache = Hash(String, String).new
def listen_queue(nc, channel)
begin
nc.subscribe(ENV["NATS_QUEUE"]) do |msg|
channel.send(msg.to_s)
end
sleep
rescue ex
Log.fatal { ex }
listen_queue nc, channel
end
end
nc = NATS::Connection.new "nats://#{ENV["NATS_USER"]}:#{ENV["NATS_PASSWORD"]}@#{ENV["NATS_HOST"]}:#{ENV["NATS_PORT"].to_i}"
nc.on_close { Log.fatal { "NATS: Connection closed!" } }
nc.on_error { |e| Logs.fatal { "NATS ERROR: #{e}"} }
WORKERS.times do
spawn do
loop do
payload = channel.receive
job = Job.from_json payload
if items_cache[job.key]?
next
else
begin
data = nil
http_key = nil
http_pools.each_key do |k|
http_key = k if job.permalink.index(k)
end
if !http_key
next
else
data = nil
http_pools[http_key].connection do |client|
begin
data = client.get(URI.parse(job.permalink).path, headers: headers)
rescue ex
Log.fatal { "#{ex} #{ex.backtrace?}" }
channel.send payload
end
end
next if !data
if data.status_code < 400
item = data.body
_start_time = nil
seller_id = nil
_price = nil
alternate_item_id = nil
_available_quantity = nil
_sold_count = nil
item.scan(metrics_regex) do |n|
case n[1]
when captures[0]
seller_id = n[2]
when captures[1]
_start_time = n[2]
when captures[2]
_price = n[2]
when captures[3]
alternate_item_id = n[2]
when captures[4]
_available_quantity = n[2]
when captures[5]
_sold_count = n[2]
end
end
_sold_count ||= item.match(alternative_sold_count_regex).try(&.[1])
_thumb = item.match thumb_regex
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339
price = 0_f32
available_quantity = 0
sold_count = 0
_price = _price.delete(all_but_numbers) if _price
if (_price && _price.to_f?)
price = _price.to_f
else
_price = item.match(alternative_price_regex).try(&.[1])
if _price
price = _price.to_f
else
Log.error { "price errado em #{job.permalink}" }
next
end
end
if _available_quantity
__available_quantity = _available_quantity.delete(all_but_numbers)
if __available_quantity.to_i?
available_quantity = __available_quantity.to_i
end
end
if _sold_count
__sold_count = _sold_count.delete(all_but_numbers)
if (__sold_count.to_i?)
sold_count = __sold_count.to_i
end
end
thumb = _thumb.try(&.[1])
visits = 0
vendor = nil
api_mercadolibre_pool.connection do |client|
begin
if seller_id
if sellers_cache[seller_id]?
seller = Seller.from_json sellers_cache[seller_id]
else
_seller = client.get("/users/#{seller_id}", headers: headers)
if _seller.status_code < 400
seller = Seller.from_json _seller.body
sellers_cache[seller_id] = _seller.body
end
end
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled}
url_visit = client.get("/visits/items?ids=#{job.key}", headers: headers)
if url_visit.status_code < 400
_visits = JSON.parse url_visit.body
if _visits.try(&.[job.key]).try(&.as_i?)
visits += _visits[job.key].as_i
end
end
end
rescue ex
Log.fatal { "#{ex} #{ex.backtrace?}" }
channel.send payload
end
end
next if !vendor
db_saved = false
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id}
pg_pool.connection do |client|
begin
client.transaction do |tx|
tx.connection.exec vendors_query, vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled]
tx.connection.exec items_query, job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id]
tx.connection.exec metrics_query, job.key, sold_count, price, visits, available_quantity, Time.utc.to_s(date_format)
end
db_saved = true
rescue ex
Log.fatal { "#{ex} #{ex.backtrace?}" }
channel.send payload
end
end
else
Log.fatal { "URL #{job.permalink} retornou status #{data.status_code}" }
end
end
items_cache[job.key] = 1 if db_saved
rescue ex
Log.fatal { "ERRO GERAL #{ex} #{ex.backtrace?}" }
next
end
end
end
end
end
listen_queue nc, channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment