-
-
Save thelinuxlich/e5162819c2d05aaf77e1a4024a425ab9 to your computer and use it in GitHub Desktop.
Crystal updated scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "json" | |
require "pg" | |
require "amqp-client" | |
require "logger" | |
require "openssl" | |
require "uri" | |
require "pool" | |
log = Logger.new(STDOUT) | |
log.level = Logger::DEBUG | |
STDOUT.sync = true | |
struct Seller | |
include JSON::Serializable | |
getter nickname : String | |
getter permalink : String | |
getter seller_reputation : SellerReputation | |
end | |
struct SellerReputation | |
include JSON::Serializable | |
getter transactions : Transactions | |
end | |
struct Transactions | |
include JSON::Serializable | |
getter completed : Int32 | |
getter canceled : Int32 | |
end | |
struct Job | |
include JSON::Serializable | |
getter key : String | |
getter permalink : String | |
end | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36" | |
headers = HTTP::Headers{"User-Agent" => user_agent} | |
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}" | |
sleep 10.seconds | |
p "Iniciando serviço de captura de preços em anúncios..." | |
channel = Channel(String).new | |
WORKERS = 100 | |
INITIAL = 50 | |
HTTP_TIMEOUT = 60.0 | |
pg_pool = Pool.new(capacity: WORKERS + 100, initial: INITIAL, timeout: HTTP_TIMEOUT) do | |
db = PG.connect PG_URL | |
[db.prepared("INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key) | |
DO UPDATE SET | |
name = EXCLUDED.name, | |
permalink = EXCLUDED.PERMALINK, | |
transactions_completed = EXCLUDED.transactions_completed, | |
transactions_cancelled = EXCLUDED.transactions_cancelled, | |
updated_at = CURRENT_TIMESTAMP"), | |
db.prepared("UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1"), | |
db.prepared("INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING")] | |
end | |
tls = OpenSSL::SSL::Context::Client.insecure | |
count = 0 | |
api_mercadolibre_pool = Pool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new "api.mercadolibre.com", tls: tls } | |
http_pools = ["produto.mercadolivre.com.br", "caminhao.mercadolivre.com.br", "carro.mercadolivre.com.br", | |
"construcao-manutencao.mercadolivre.com.br", "festa-evento.mercadolivre.com.br", "moto.mercadolivre.com.br", | |
"servico.mercadolivre.com.br", "suporte-tecnico.mercadolivre.com.br", "transporte-veiculo.mercadolivre.com.br", | |
"veiculo.mercadolivre.com.br", "www.mercadolivre.com.br"].to_h do |i| | |
{i, Pool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new i, tls: tls }} | |
end | |
metrics_regex = /\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/ | |
alternative_sold_count_regex = /<div class="item-conditions">(.*?)<\/div>/ | |
alternative_price_regex = /\{"type":"price","value":(.*?),/ | |
thumb_regex = /a href="(.*?)" class="gallery-trigger/ | |
all_but_numbers = "^0-9" | |
date_format = "%F" | |
ok = "1" | |
captures = ["sellerId", "startTime", "localItemPrice", "itemId", "availableStock", "soldStock"] | |
items_cache = Hash(String, Int32).new | |
AMQP::Client.start("amqp://#{ENV["RABBITMQ_USER"]}:#{ENV["RABBITMQ_PASSWORD"]}@#{ENV["RABBITMQ_HOST"]}:#{ENV["RABBITMQ_PORT"].to_i}") do |c| | |
c.channel do |ch| | |
queue = ch.queue(ENV["RABBITMQ_QUEUE"]) | |
WORKERS.times do | |
spawn do | |
loop do | |
payload = channel.receive | |
job = Job.from_json payload | |
count += 1 | |
p "[#{count}] Item #{job.key} passando pela atualização de preços..." | |
if items_cache[job.key]? | |
p "Item #{job.key} já passou pela atualização de preços..." | |
else | |
begin | |
data = nil | |
http_key = nil | |
http_pools.each_key do |k| | |
http_key = k if job.permalink.index(k) | |
end | |
if !http_key | |
next | |
else | |
client = http_pools[http_key].checkout | |
data = client.get(URI.parse(job.permalink).path, headers: headers) | |
http_pools[http_key].checkin client | |
item = data.body | |
_start_time = nil | |
seller_id = nil | |
_price = nil | |
alternate_item_id = nil | |
_available_quantity = nil | |
_sold_count = nil | |
item.scan(metrics_regex) do |n| | |
case n[1] | |
when captures[0] | |
seller_id = n[2] | |
when captures[1] | |
_start_time = n[2] | |
when captures[2] | |
_price = n[2] | |
when captures[3] | |
alternate_item_id = n[2] | |
when captures[4] | |
_available_quantity = n[2] | |
when captures[5] | |
_sold_count = n[2] | |
end | |
end | |
_sold_count ||= item.match(alternative_sold_count_regex).try(&.[1]) | |
_thumb = item.match thumb_regex | |
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339 | |
price = 0_f32 | |
available_quantity = 0 | |
sold_count = 0 | |
if (_price && _price.to_f?) | |
price = _price.to_f | |
else | |
_price = item.match(alternative_price_regex).try(&.[1]) | |
if _price | |
price = _price.to_f | |
else | |
p "price errado em #{job.permalink}" | |
end | |
end | |
if _available_quantity | |
__available_quantity = _available_quantity.delete(all_but_numbers) | |
if __available_quantity.to_i? | |
available_quantity = __available_quantity.to_i | |
else | |
p "available_quantity errado em #{job.permalink}" | |
end | |
end | |
if _sold_count | |
__sold_count = _sold_count.delete(all_but_numbers) | |
if (__sold_count.to_i?) | |
sold_count = __sold_count.to_i | |
else | |
p "sold_count errado em #{job.permalink}" | |
end | |
end | |
thumb = nil | |
seller = nil | |
client = api_mercadolibre_pool.checkout | |
if seller_id | |
_seller = client.get("/users/#{seller_id}", headers: headers) | |
if _seller.status_code < 400 | |
seller = Seller.from_json _seller.body | |
else | |
p "Não havia seller para id #{seller_id}" | |
end | |
else | |
p "não havia seller_id em #{job.permalink}" | |
channel.send payload | |
next | |
end | |
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id} | |
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled} | |
visits = 0 | |
url_visit = client.get("/visits/items?ids=#{job.key}", headers: headers) | |
if url_visit.status_code < 400 | |
_visits = JSON.parse url_visit.body | |
if _visits[job.key].as_i? | |
visits += _visits[job.key].as_i | |
end | |
end | |
api_mercadolibre_pool.checkin client | |
client = pg_pool.checkout | |
if vendor | |
client[0].exec vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled] | |
client[1].exec job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id] | |
end | |
client[2].exec job.key, sold_count, price, visits, available_quantity, Time.utc.to_s(date_format) | |
pg_pool.checkin client | |
end | |
items_cache[job.key] = 1 | |
rescue ex | |
log.fatal(ex) | |
channel.send payload | |
end | |
end | |
end | |
end | |
end | |
queue.subscribe(no_ack: true, block: true) do |_msg| | |
channel.send(_msg.body_io.to_s) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment