-
-
Save thelinuxlich/7c29b8b3c39913b9966c0ab259053dbb to your computer and use it in GitHub Desktop.
Crystal updated scraper(FINAL)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "json" | |
require "pg" | |
require "nats" | |
require "log" | |
require "openssl" | |
require "uri" | |
require "pool/connection" | |
struct Seller | |
include JSON::Serializable | |
getter nickname : String | |
getter permalink : String | |
getter seller_reputation : SellerReputation | |
end | |
struct SellerReputation | |
include JSON::Serializable | |
getter transactions : Transactions | |
end | |
struct Transactions | |
include JSON::Serializable | |
getter completed : Int32 | |
getter canceled : Int32 | |
end | |
struct Job | |
include JSON::Serializable | |
getter key : String | |
getter permalink : String | |
end | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36" | |
headers = HTTP::Headers{"User-Agent" => user_agent} | |
PG_URL = "postgres://#{ENV["PGUSER"]}:#{ENV["PGPASSWORD"]}@#{ENV["PGHOST"]}:5432/#{ENV["PGDATABASE"]}" | |
Log.info { "Iniciando serviço de captura de preços em anúncios..." } | |
channel = Channel(String).new 10_000_000 # we need a buffered channel because of NATS, otherwise it may drop the connection because of "slow consumer" | |
WORKERS = 500 | |
INITIAL = 100 | |
HTTP_TIMEOUT = 30.0 | |
vendors_query = "INSERT INTO VENDORS(NAME, KEY, PERMALINK,TRANSACTIONS_COMPLETED, TRANSACTIONS_CANCELLED) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (key) | |
DO UPDATE SET | |
name = EXCLUDED.name, | |
permalink = EXCLUDED.PERMALINK, | |
transactions_completed = EXCLUDED.transactions_completed, | |
transactions_cancelled = EXCLUDED.transactions_cancelled, | |
updated_at = CURRENT_TIMESTAMP" | |
items_query = "UPDATE ITEMS SET START_TIME = $2, ALTERNATE_ITEM_ID = $3,UPDATED_AT = $4,THUMB = $5, VENDOR_ID = $6 WHERE KEY = $1" | |
metrics_query = "INSERT INTO ITEMS_METRICS(ITEM_ID, SOLD_COUNT, PRICE, VISITS, AVAILABLE_QUANTITY,CREATED_AT) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING" | |
sleep 10.seconds | |
pg_pool = ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) do | |
PG.connect PG_URL | |
end | |
tls = OpenSSL::SSL::Context::Client.insecure | |
api_mercadolibre_pool = ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new "api.mercadolibre.com", tls: tls } | |
http_pools = ["adulto.mercadolivre.com.br", "produto.mercadolivre.com.br", "caminhao.mercadolivre.com.br", "carro.mercadolivre.com.br", | |
"construcao-manutencao.mercadolivre.com.br", "festa-evento.mercadolivre.com.br", "moto.mercadolivre.com.br", | |
"servico.mercadolivre.com.br", "suporte-tecnico.mercadolivre.com.br", "transporte-veiculo.mercadolivre.com.br", | |
"veiculo.mercadolivre.com.br", "www.mercadolivre.com.br"].to_h do |i| | |
{i, ConnectionPool.new(capacity: WORKERS, initial: INITIAL, timeout: HTTP_TIMEOUT) { HTTP::Client.new i, tls: tls }} | |
end | |
metrics_regex = /\"(sellerId|startTime|localItemPrice|itemId|quantity|availableStock|soldStock)\"\:([a-zA-Z0-9\:\.\-\"]*)\,/ | |
alternative_sold_count_regex = /<div class="item-conditions">(.*?)<\/div>/ | |
alternative_price_regex = /\{"type":"price","value":(.*?),/ | |
thumb_regex = /a href="(.*?)" class="gallery-trigger/ | |
all_but_numbers = "^0-9" | |
date_format = "%F" | |
ok = "1" | |
captures = ["sellerId", "startTime", "localItemPrice", "itemId", "availableStock", "soldStock"] | |
items_cache = Hash(String, Int32).new | |
sellers_cache = Hash(String, String).new | |
def listen_queue(nc, channel) | |
begin | |
nc.subscribe(ENV["NATS_QUEUE"]) do |msg| | |
channel.send(msg.to_s) | |
end | |
sleep | |
rescue ex | |
Log.fatal { ex } | |
listen_queue nc, channel | |
end | |
end | |
nc = NATS::Connection.new "nats://#{ENV["NATS_USER"]}:#{ENV["NATS_PASSWORD"]}@#{ENV["NATS_HOST"]}:#{ENV["NATS_PORT"].to_i}" | |
nc.on_close { Log.fatal { "NATS: Connection closed!" } } | |
nc.on_error { |e| Logs.fatal { "NATS ERROR: #{e}"} } | |
WORKERS.times do | |
spawn do | |
loop do | |
payload = channel.receive | |
job = Job.from_json payload | |
if items_cache[job.key]? | |
next | |
else | |
begin | |
data = nil | |
http_key = nil | |
http_pools.each_key do |k| | |
http_key = k if job.permalink.index(k) | |
end | |
if !http_key | |
next | |
else | |
data = nil | |
http_pools[http_key].connection do |client| | |
begin | |
data = client.get(URI.parse(job.permalink).path, headers: headers) | |
rescue ex | |
Log.fatal { "#{ex} #{ex.backtrace?}" } | |
channel.send payload | |
end | |
end | |
next if !data | |
if data.status_code < 400 | |
item = data.body | |
_start_time = nil | |
seller_id = nil | |
_price = nil | |
alternate_item_id = nil | |
_available_quantity = nil | |
_sold_count = nil | |
item.scan(metrics_regex) do |n| | |
case n[1] | |
when captures[0] | |
seller_id = n[2] | |
when captures[1] | |
_start_time = n[2] | |
when captures[2] | |
_price = n[2] | |
when captures[3] | |
alternate_item_id = n[2] | |
when captures[4] | |
_available_quantity = n[2] | |
when captures[5] | |
_sold_count = n[2] | |
end | |
end | |
_sold_count ||= item.match(alternative_sold_count_regex).try(&.[1]) | |
_thumb = item.match thumb_regex | |
start_time = _start_time || Time.utc(1970, 1, 1, 0, 0, 0).to_rfc3339 | |
price = 0_f32 | |
available_quantity = 0 | |
sold_count = 0 | |
_price = _price.delete(all_but_numbers) if _price | |
if (_price && _price.to_f?) | |
price = _price.to_f | |
else | |
_price = item.match(alternative_price_regex).try(&.[1]) | |
if _price | |
price = _price.to_f | |
else | |
Log.error { "price errado em #{job.permalink}" } | |
next | |
end | |
end | |
if _available_quantity | |
__available_quantity = _available_quantity.delete(all_but_numbers) | |
if __available_quantity.to_i? | |
available_quantity = __available_quantity.to_i | |
end | |
end | |
if _sold_count | |
__sold_count = _sold_count.delete(all_but_numbers) | |
if (__sold_count.to_i?) | |
sold_count = __sold_count.to_i | |
end | |
end | |
thumb = _thumb.try(&.[1]) | |
visits = 0 | |
vendor = nil | |
api_mercadolibre_pool.connection do |client| | |
begin | |
if seller_id | |
if sellers_cache[seller_id]? | |
seller = Seller.from_json sellers_cache[seller_id] | |
else | |
_seller = client.get("/users/#{seller_id}", headers: headers) | |
if _seller.status_code < 400 | |
seller = Seller.from_json _seller.body | |
sellers_cache[seller_id] = _seller.body | |
end | |
end | |
vendor = !seller ? nil : {name: seller.nickname, key: seller_id, permalink: seller.permalink, transactions_completed: seller.seller_reputation.transactions.completed, transactions_cancelled: seller.seller_reputation.transactions.canceled} | |
url_visit = client.get("/visits/items?ids=#{job.key}", headers: headers) | |
if url_visit.status_code < 400 | |
_visits = JSON.parse url_visit.body | |
if _visits.try(&.[job.key]).try(&.as_i?) | |
visits += _visits[job.key].as_i | |
end | |
end | |
end | |
rescue ex | |
Log.fatal { "#{ex} #{ex.backtrace?}" } | |
channel.send payload | |
end | |
end | |
next if !vendor | |
db_saved = false | |
item_update = {start_time: start_time, alternate_item_id: alternate_item_id, updated_at: Time.utc.to_rfc3339, thumb: thumb, vendor_id: seller_id} | |
pg_pool.connection do |client| | |
begin | |
client.transaction do |tx| | |
tx.connection.exec vendors_query, vendor[:name], vendor[:key], vendor[:permalink], vendor[:transactions_completed], vendor[:transactions_cancelled] | |
tx.connection.exec items_query, job.key, item_update[:start_time], item_update[:alternate_item_id], Time.utc.to_rfc3339, item_update[:thumb], item_update[:vendor_id] | |
tx.connection.exec metrics_query, job.key, sold_count, price, visits, available_quantity, Time.utc.to_s(date_format) | |
end | |
db_saved = true | |
rescue ex | |
Log.fatal { "#{ex} #{ex.backtrace?}" } | |
channel.send payload | |
end | |
end | |
else | |
Log.fatal { "URL #{job.permalink} retornou status #{data.status_code}" } | |
end | |
end | |
items_cache[job.key] = 1 if db_saved | |
rescue ex | |
Log.fatal { "ERRO GERAL #{ex} #{ex.backtrace?}" } | |
next | |
end | |
end | |
end | |
end | |
end | |
listen_queue nc, channel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment