Created
February 25, 2024 16:15
-
-
Save Septicuss/348e861d505d20e81618e9f217ad6937 to your computer and use it in GitHub Desktop.
Python tori.fi query scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import requests | |
import json | |
import time | |
from datetime import datetime, timedelta | |
# -------------------------------- | |
# DEFAULT IMAGE (AD DOESN'T HAVE AN IMAGE) | |
DEFAULT_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAOEAAADhCAMAAAAJbSJIAAAAkFBMVEXqIiP////qHyDpAADrJSb+9PTqGhvpDA7pExTrLC397Oz1nZ3ygYLyiIj50NDyf3/tTk7vWVnrMjP5xcbwcnPzjo/3sbHvXl7uU1TpDxH4vb3zk5P/+vr2qan85eX3tbXtREX829vsOTrvZGT96Of83t771NTwcHHvYWL1mprxcnD4u7vrODjsQEH3tLT1o6IHv/RDAAAEU0lEQVR4nO3ba3uaMBgGYMxrE7TWwjxisVZru4Oz/f//bgKigbwB3XQj7rk/9SIxyQOUY/A8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgEpCyIT4rZ+W2Qos1Zla1jZrO+UG6NOO7wUJT+3+jpja0pCXtE3CVpJ0x9Q3KtnbrOjVks+nTm/RX7cO4uVmtCJVDjktCzv7Bh5aZQNKC4ZGwcj3RCc2FpcIoT6Nhe9U3IJ3Ro2ez+WLqLfkOunOVyQLLZrjCrIKfu/0hA9JQnNsRXe7hPfG0n4xoewaNR6ZhJIe7N31AzpuR26dOZBQvbxVdjU/7qpuJqTHmq5abyLfU51MqMyRGbpt6W5CGdZ0lHpTwtWEQj3XdJQZk6sJ1feafnKvvpsJhazrJ/dGbiZkhmXzKp1MSP2abo7GysWE9d0crcnFhPK1phe9w45wMCFzP2A3kX83YXyJhNGIa/p9MOAWJ7+8asJ4M9Ntduey6yQckqIts4Ifrp2wSwXqWgnnSRPqq1nwFF09oVdynYRhet5r33DCaZbQTHIzCUMkREI9Yco8atkS6g8pxZUTfjELzk+4Hi92PsyzqyXhKtBshQMJrSzXNLGmtaEbTFgwQ0IkREKzAAndTzi6+YS9s+8PrZAwcZmEam62jITGciRkIGECCd1POLjBhHfDniZ519X4hPHHx+4W/8mc9mN7iuFrklYbn3CdPad5OTmhV9L4hN30cZIM3Ep4zqwvFxN63qTsWz5zz5mE0+qE1tmX7iScVCe0ukTC8qpjE+rPjNmESqsRPZlB+iQ8QczL4aGWUMjIlxdPGIQFbMLlsdKLYBN+nx5qdAT7lnvQIWKSZ/+gaTxF2x+jx5AKU0//PGEZ+5Zbs1BsQs1ESuZFqE06UyGNIj/3gxtMtamnF0/Iz1TQjGsThlK+1E5FPljv/8VpqP1mfJzy3syEHlVWKLeXBixOhOsfZpY3NeGmppuj7ECjytNtZ5E94XO62f9tQubtpa2/dHuIn8a48tNPQxMK/9TddEGWE+izL5qckDvj8LKrz2htloTW69JGJDx17t5gP1jm2PsUNTqhp06b2rZNNxQ70y/bf5ub0KP3mp4S91kKNuGs6QlP2U/zj43khCn8VA1P6PnTmq6SvvZXpMwklNbXZh9p0gNkTcR4dbjCppk5EK/ZZ4tscKuqes+d4y0E8/3JXDX6jJ8PPDK3zaGdSL9HokWpuJvvwc1OuLvlDZdsncK3eUlFVZzdHwcVTzHWf5QwZr+w1Hwo9tmfZqJvHEnBonw6786/UOlOfrcu9M29Do43/md8JZsdtX7vK9lS83Ut6CRRcL9YZjHj98Voa34jm+6ow/zSLZ5LfQX8nS+dz61QCpl836zaXjt5VB2VN1/Op9fPQb8/Hkri1oADKr/4TmVrwnc0HwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA/C9+ARZultqQpurbAAAAAElFTkSuQmCC" | |
# TORI RELATED API | |
TORI_API_IMAGES = "https://images.tori.fi/api/v1/imagestori/images/{}.jpg?rule=thumb_280x210" | |
TORI_API_ADS = "https://api.tori.fi/api/v1.2/public/ads?&q={}" | |
TORI_QUERIES = [ | |
"stray+kids", | |
"skz", | |
"zb1", | |
"zerobaseone" | |
] | |
# DISCORD WEBHOOK FOR NOTIFICATIONS | |
DISCORD_WEBHOOK_URL = '' | |
DISCORD_MESSAGE = "@everyone" | |
DISCORD_DESCRIPTION = "**UUSI TUOTE**\n\n{}" | |
# WHEN CHANGES ARE SCRAPED | |
UPDATE_PERIOD_SECONDS = 60 * 5 # every 5 minutes | |
DATA_PURGE_PERIOD_DAYS = 10 # every 10 days | |
# -------------------------------- | |
# DATABASE | |
# Keeps track of which IDs have already been scraped | |
class Data: | |
def __init__(self): | |
self.__db = sqlite3.connect("data.db") | |
self.__db.isolation_level = None | |
self.__create_tables() | |
def __create_tables(self): | |
# Check if the table exists | |
existing_table_query = "SELECT name FROM sqlite_master WHERE type='table' AND name='items'" | |
result = self.__db.execute(existing_table_query).fetchone() | |
if result: | |
# Table exists, check if the column already exists | |
existing_column_query = "PRAGMA table_info(items)" | |
columns = self.__db.execute(existing_column_query).fetchall() | |
if not any(column[1] == 'created_at' for column in columns): | |
# 'created_at' column does not exist, add it | |
alter_statement = """ | |
ALTER TABLE items | |
ADD COLUMN created_at TIMESTAMP | |
""" | |
self.__db.execute(alter_statement) | |
else: | |
# Table doesn't exist, create it with the new column | |
create_statement = """ | |
CREATE TABLE items ( | |
id TEXT PRIMARY KEY, | |
created_at TIMESTAMP | |
) | |
""" | |
self.__db.execute(create_statement) | |
# Update existing records with the current timestamp | |
update_statement = """ | |
UPDATE items | |
SET created_at = CURRENT_TIMESTAMP | |
WHERE created_at IS NULL | |
""" | |
self.__db.execute(update_statement) | |
def add(self, id: str): | |
statement = """ | |
INSERT INTO items (id) VALUES (?) | |
""" | |
self.__db.execute(statement, (id,)) | |
def exists(self, id: str) -> bool: | |
statement = """ | |
SELECT id FROM items WHERE id=? | |
""" | |
result = self.__db.execute(statement, (id,)).fetchone() | |
if not result or not result[0]: | |
return False | |
return True | |
def purge_old(self, days_threshold: int = 10): | |
threshold_date = datetime.now() - timedelta(days=days_threshold) | |
formatted_date = threshold_date.strftime("%Y-%m-%d %H:%M:%S") | |
statement = """ | |
DELETE FROM items WHERE created_at < ? | |
""" | |
count_statement = """ | |
SELECT COUNT(*) FROM items WHERE created_at < ? | |
""" | |
count_before_deletion = self.__db.execute(count_statement, (formatted_date,)).fetchone()[0] | |
# Execute the delete statement | |
self.__db.execute(statement, (formatted_date,)) | |
count_after_deletion = self.__db.execute(count_statement, (formatted_date,)).fetchone()[0] | |
deleted = count_before_deletion - count_before_deletion | |
return deleted | |
# TORIPRODUCT | |
# Taking necessary data from the advertisement json returned from tori.fi | |
class ToriProduct: | |
def __init__(self, ad: dict): | |
self.ad = ad | |
self.title = ad["subject"] | |
self.url = ad["share_link"] | |
self.description = ad["body"] | |
self.author = ad["user"]["account"]["name"] | |
self.__setup_image() | |
self.__setup_price() | |
self.__setup_for_sale() | |
def __setup_image(self): | |
self.image_url = 'data:image/png;base64,{DEFAULT_IMAGE_BASE64}' | |
if "thumbnail" in self.ad: | |
# "media_id":"/public/media/ad/9116093602" | |
image_id = self.ad["thumbnail"]["media_id"].split("/")[-1] | |
self.image_url = TORI_API_IMAGES.format(image_id) | |
def __setup_price(self): | |
self.price = '-' | |
if "list_price" in self.ad and "label" in self.ad["list_price"]: | |
self.price = self.ad["list_price"]["label"] | |
def __setup_for_sale(self): | |
self.for_sale = False | |
if "type" in self.ad and "code" in self.ad["type"]: | |
code = self.ad["type"]["code"] | |
self.for_sale = (code == 's') | |
def notify_webhook(product: ToriProduct): | |
payload = { | |
'content': DISCORD_MESSAGE, | |
'embeds': [ | |
{ | |
'fields': [ | |
{ | |
'name': 'Hinta', | |
'value': product.price | |
}, | |
{ | |
'name': 'Ilmoittaja', | |
'value': product.author | |
} | |
], | |
'title': product.title, | |
'description': DISCORD_DESCRIPTION.format(product.description), | |
'url': product.url, | |
'thumbnail': { | |
'url': product.image_url, | |
}, | |
} | |
] | |
} | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
requests.post(DISCORD_WEBHOOK_URL, data=json.dumps(payload), headers=headers) | |
def request_listings(query: str): | |
url = TORI_API_ADS.format(query) | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print("Got response code", response.status_code, "for query", query) | |
def scrape_query(query: str, data: Data): | |
listings = request_listings(query) | |
if not listings: | |
print("Failed to query listings") | |
return | |
updated = False | |
updated_amount = 0 | |
for ad_obj in listings["list_ads"]: | |
id = ad_obj["ad"]["share_link"].split("/")[-1] | |
if not data.exists(id): | |
product = ToriProduct(ad_obj["ad"]) | |
# We're looking for products on sale only | |
if not product.for_sale: | |
print("Product wasn't for sale:",product.url) | |
continue | |
print("Adding", id, product.title) | |
data.add(id) | |
notify_webhook(product) | |
updated = True | |
updated_amount += 1 | |
if updated: | |
print("Total of", updated_amount,"products for", query) | |
def scrape_queries(data: Data): | |
for query in TORI_QUERIES: | |
print("Scraping query",query) | |
scrape_query(query, data) | |
def purge_old_data(data: Data): | |
deleted = data.purge_old(DATA_PURGE_PERIOD_DAYS) | |
if deleted > 0: | |
print(f"Purged {deleted} old entries.") | |
def run(): | |
data = Data() | |
while True: | |
scrape_queries(data) | |
purge_old_data(data) | |
# Until next time! | |
time.sleep(UPDATE_PERIOD_SECONDS) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment