Skip to content

Instantly share code, notes, and snippets.

@4Kaylum
Last active May 26, 2021 17:42
Show Gist options
  • Save 4Kaylum/d32ce0e4e8bee4f7a84bb869f0398641 to your computer and use it in GitHub Desktop.
Save 4Kaylum/d32ce0e4e8bee4f7a84bb869f0398641 to your computer and use it in GitHub Desktop.
A small UI for the Python Discord server's pixels event
import time
import typing
from datetime import datetime as dt, timedelta
import json
import sys
import multiprocessing
from tkinter.colorchooser import askcolor
import requests
import pygame
TOKEN = ""
class RateLimit(object):
def __init__(self, limit, remaining, reset):
self.limit = limit
self.remaining = remaining
self.reset = reset
def decrement(self):
if self.time_expired:
self.remaining = self.limit
self.remaining -= 1
@property
def time_expired(self):
return dt.utcnow() < self.reset
@property
def limit_expired(self):
return self.remaining <= 0
def wait_until_reset(self):
if self.limit_expired:
return
remaining_time = (self.reset - dt.utcnow()).total_seconds()
if remaining_time <= 0:
return
print(f"Sleeping for {remaining_time} seconds")
time.sleep(remaining_time)
@classmethod
def from_request(cls, request, add_one: bool = True):
headers = request.headers
limit = headers.get("requests-limit")
if limit is None:
return
limit = int(limit)
remaining = headers.get("requests-remaining")
if remaining is None:
return None
remaining = int(remaining)
remaining += int(add_one)
reset_seconds = headers.get("requests-reset", headers.get("cooldown-reset"))
if reset_seconds is None:
return None
reset_seconds = float(reset_seconds)
return cls(
limit, remaining, dt.utcnow() + timedelta(seconds=reset_seconds),
)
def __repr__(self):
return f"RateLimit(limit={self.limit}, remaining={self.remaining}, reset={self.reset})"
class Pixel(object):
def __init__(self, x: int, y: int, colour: int):
self.x = x
self.y = y
self.colour = colour
@property
def rgb(self):
all_colour = int(self.colour, 16)
r = (all_colour >> (2 * 8)) & 0xff
g = (all_colour >> (1 * 8)) & 0xff
b = (all_colour >> (0 * 8)) & 0xff
return (r, g, b,)
@property
def location(self):
return (self.x, self.y,)
def __repr__(self):
return f"Pixel(x={self.x}, y={self.y}, colour={self.colour})"
class PixelsClient(object):
BASE = "https://pixels.pythondiscord.com"
def __init__(self, token: str):
self.token = token
self._size = None
self._pixels = list()
self._rate_limit = dict()
@property
def headers(self):
return {
"Authorization": f"Bearer {self.token}"
}
def request(self, method, path, **kwargs):
self.wait_for_rate_limit(method, path)
meth = getattr(requests, method.lower())
site = meth(self.BASE + path, **kwargs, headers=self.headers)
self._update_ratelimit(method, path, site)
print(f"{method.upper()} {path} - {site.status_code} ({len(site.content):,} bytes)")
if site.status_code == 429:
return self.request(method, path, **kwargs)
return site.content
def request_json(self, *args, **kwargs):
data = self.request(*args, **kwargs)
return json.loads(data)
def wait_for_rate_limit(self, method, path):
rate_limit = self._rate_limit.get((method.upper(), path))
if not rate_limit:
return
if rate_limit.time_expired:
return
if rate_limit.limit_expired:
rate_limit.wait_until_reset()
def _update_ratelimit(self, method, path, request):
rate_limit = self._rate_limit.get((method.upper(), path))
if rate_limit is None or rate_limit.time_expired:
rate_limit = self._rate_limit.setdefault(
(method.upper(), path),
RateLimit.from_request(request),
)
if rate_limit is None:
return
rate_limit.decrement()
def get_size(self) -> typing.Tuple[int]:
"""
Get the size of the board.
"""
if self._size:
return self._size
data = self.request_json("GET", "/get_size")
self._size = [data['width'], data['height']]
print(f"Size found - {self._size}")
return self._size
def get_pixel(self, x: int, y: int) -> Pixel:
"""
Get a pixel from the board
"""
params = {"x": x, "y": y}
data = self.request_json("GET", "/get_pixel", params=params)
colour = data.pop("rgb")
return Pixel(**data, colour=colour)
def get_all_pixels(self) -> typing.List[Pixel]:
"""
Get a pixel from the board
"""
# Run our requests
size = self.get_size()
content = self.request("GET", "/get_pixels")
# Parse the string
x, y = 0, 0
pixels = []
while content:
colour = "".join([format(hex(i)[2:], "0>2") for i in content[:3]])
pixel = Pixel(x, y, colour)
pixels.append(pixel)
x += 1
if x >= size[0]:
x = 0
y += 1
content = content[3:]
self._pixels = pixels
return pixels
def set_pixel(self, x: int, y: int, colour: int) -> str:
"""
Sets the colour of a pixel on the board.
"""
json = {"x": x, "y": y, "rgb": format(hex(colour)[2:], "0>6")}
data = self.request_json("POST", "/set_pixel", json=json)
print(data)
def ask_colour_set_pixel(client, mouse_location):
"""
Ask the user for a colour to set the pixel at `mouse_location` to.
"""
pixel_location = (mouse_location[0] // 10), (mouse_location[1] // 10)
_, colour = askcolor()
if colour is None:
print("Cancelled colour set")
return
colour = colour.lstrip("#")
print(f"Setting pixel at {pixel_location} to {colour}")
client.set_pixel(pixel_location[0], pixel_location[1], int(colour, 16))
def main():
# Make a pixel client
client = PixelsClient(TOKEN)
size = client.get_size()
# Init Pygame
pygame.init()
screen = pygame.display.set_mode((size[0] * 10, size[1] * 10))
clock = pygame.time.Clock()
# Main window loop
client.get_all_pixels() # This'll get all of the pixels and store them in :attr:`_pixels`.
get_new_pixels_process: multiprocessing.Process = None
set_pixel_process: multiprocessing.Process = None
mouse_location = None
while True:
# Handle events
for event in pygame.event.get():
# See if the user wants to quit
if event.type == pygame.QUIT:
if get_new_pixels_process:
get_new_pixels_process.terminate()
if set_pixel_process:
set_pixel_process.terminate()
sys.exit(0)
# See if they want to refresh the grid
elif event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
if get_new_pixels_process and get_new_pixels_process.is_alive():
print("Already waiting on grabbing new pixels")
else:
print("Space detected - grabbing new pixels")
get_new_pixels_process = multiprocessing.Process(None, client.get_all_pixels)
get_new_pixels_process.start()
# See if they want to set a new pixel
elif event.type == pygame.MOUSEBUTTONDOWN and event.button == 1:
set_pixel_process = multiprocessing.Process(None, ask_colour_set_pixel, args=(client, mouse_location))
set_pixel_process.start()
# Mousemove - highlight hovered over pixel
elif event.type == pygame.MOUSEMOTION:
mouse_location = pygame.mouse.get_pos()
# Get pixels to show on screen
for p in client._pixels:
rect = pygame.Rect(p.x * 10, p.y * 10, 10, 10)
pygame.draw.rect(screen, p.rgb, rect)
# Draw the mouse highlight box
if mouse_location:
rect = pygame.Rect((mouse_location[0] // 10) * 10, (mouse_location[1] // 10) * 10, 10, 10)
pygame.draw.rect(screen, (255, 255, 255), rect, 2)
# Update display
pygame.display.flip()
# Tell the window to tick the FPS counter
clock.tick(60)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment