Skip to content

Instantly share code, notes, and snippets.

@tripleee
Last active May 11, 2022 13:16
Show Gist options
  • Save tripleee/af76291a51b8bb81614ea3fe59cd2d24 to your computer and use it in GitHub Desktop.
Save tripleee/af76291a51b8bb81614ea3fe59cd2d24 to your computer and use it in GitHub Desktop.
Smokey experiment, rescan after 5min
import json
import time
from datetime import datetime, timedelta
import websocket
import requests
from bodyfetcher import BodyFetcher
from globalvars import GlobalVars
from datahandling import load_files
from spamhandling import check_if_spam, check_if_spam_json
from parsing import fetch_post_id_and_site_from_url
from apigetpost import api_get_post
from classes._Post import Post
GlobalVars.bodyfetcher = BodyFetcher()
load_files()
ws = websocket.create_connection("wss://qa.sockets.stackexchange.com/")
ws.send("155-questions-active")
stats = {'scanned': 0, 'rescanned': 0, 'spam': 0}
sites = {
"es.stackoverflow.com": "es.stackoverflow",
"ja.stackoverflow.com": "ja.stackoverflow",
"pt.stackoverflow.com": "pt.stackoverflow",
"ru.stackoverflow.com": "ru.stackoverflow"
}
queue = dict()
def scan(post, rescan=False):
# global stats
stats['scanned'] += 1
if rescan:
stats['rescanned'] += 1
prev = post
orig = (
prev["data"].get("titleEncodedFancy", None),
prev["data"].get("bodySummary", None),)
url = prev["data"]["url"]
site = prev["data"]["siteBaseHostAddress"]
id = prev["data"]["id"]
post_data = api_get_post(url)
post = Post(api_response=post_data.as_dict)
is_spam, reasons, why = check_if_spam(post)
if prev["_scanned"][0] != is_spam:
print("\agrace period edit %s" % url)
elif orig != (post.title, post.body):
print("grace period edit %s" % url)
else:
is_spam, reasons, why = check_if_spam_json(post['post'])
post["_scanned"] = is_spam, reasons
site, id = post['data']['siteBaseHostAddress'], post['data']['id']
if is_spam:
stats['spam'] += 1
if any(x in reason for x in (
"coinbase", "quickbooks", "binance", "airline", "robinhood",
"number") for reason in reasons):
if not rescan:
print("\a")
with open('detected.json', 'a') as j:
j.write(json.dumps({
'is_spam': is_spam,
'reasons': reasons,
'why': why,
'post': post}) + '\n')
print('[%i/%i/%i] %s %s:%i' % (
stats['scanned'], stats['rescanned'], stats['spam'],
"Rescan" if rescan else "Scan", site, id))
if is_spam or reasons:
print(is_spam, reasons, why)
while True:
# FIXME: copy/paste from ws.py
try:
a = ws.recv()
if a is not None and a != "":
j = json.loads(a)
action = j["action"]
if action == "hb":
ws.send("hb")
if action == "155-questions-active":
data = json.loads(j['data'])
post = {'post': a, 'data': data}
if data["siteBaseHostAddress"] == "stackoverflow.com":
schedule = datetime.now() + timedelta(minutes=5)
if schedule not in queue:
queue[schedule] = []
queue[schedule].append(post)
scan(post)
except Exception as e:
print(e)
time.sleep(10)
tbd = []
for timestamp, items in queue.items():
if timestamp <= datetime.now():
tbd.append(timestamp)
for item in items:
scan(item, rescan=True)
for key in tbd:
del queue[key]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment