Skip to content

Instantly share code, notes, and snippets.

@badjano
Last active October 19, 2018 23:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save badjano/bcc2b13e99707542d2708c97b5759148 to your computer and use it in GitHub Desktop.
Save badjano/bcc2b13e99707542d2708c97b5759148 to your computer and use it in GitHub Desktop.
ImageConvolutionImage
from PIL import ImageFilter, Image
from math import floor, sqrt, tanh
from threading import Thread, Lock
from multiprocessing import cpu_count
from queue import Queue
import time
import pygame
lock = Lock()
q = Queue()
threads_status = {}
def clamp(a, _min, _max):
return max(_min, min(a, _max - 1))
def limit(point, w, h):
return (
clamp(point[0], 0, w),
clamp(point[1], 0, h)
)
def conv_image(im, kernel, out, rect, quad_index, t):
im_w, im_h = im.size
k_w, k_h = kernel.size
x_start, y_start, x_end, y_end = rect
i = 0
total_pixels = (x_end - x_start) * (y_end - y_start)
for x in range(x_start, x_end):
for y in range(y_start, y_end):
c = (0, 0, 0)
total = (0, 0, 0)
for x2 in range(k_w):
for y2 in range(k_h):
a = floor(x2 / 2)
b = floor(y2 / 2)
p = limit((x - a, y - b), im_w, im_h)
r, g, b = im.getpixel(p)
r2, g2, b2 = kernel.getpixel((x2, y2))
r2 = r2 * 2 - 128
g2 = g2 * 2 - 128
b2 = b2 * 2 - 128
c = (c[0] + r * r2, c[1] + g * g2, c[2] + b * b2)
total = (total[0] + abs(r2), total[1] + abs(g2), total[2] + abs(b2))
pixel = (
int(c[0] / total[0]),
int(c[1] / total[1]),
int(c[2] / total[2])
)
out.putpixel((x, y), pixel)
i += 1
threads_status[quad_index] = i / total_pixels
def do_work(data, t):
im, kernel, out, rect, quad_index = data
conv_image(im, kernel, out, rect, quad_index, t)
# lock.acquire()
# print("Done on thread: ",t)
# lock.release()
def worker(data):
thread_index = q.get()
while len(data) > 0:
do_work(data.pop(0), thread_index)
q.task_done()
def tiles(size, count):
ts = []
side = round(sqrt(count))
w, h = (round(size[0] / side), round(size[1] / side))
for i in range(count):
x = (i % side) * w
y = floor(i / side) * h
ts.append((x, y, x + w, y + h))
return ts
def gcd(a, b):
if b == 0:
return a
else:
return gcd(b, a % b)
def monitor():
done = False
while not done or (len(data) > 0 and sum([v for k, v in threads_status.items()]) < len(threads_status.items())):
print(len(data))
print(", ".join(["%d : %.02f" % (k, v) for k, v in threads_status.items()]))
data = []
def conv_iteration():
size_scale = 5
size = (160 * size_scale, 90 * size_scale)
screen = pygame.display.set_mode(size)
pygame.display.set_caption("My Game")
done = False
clock = pygame.time.Clock()
im = Image.open("img/image.jpg").resize(size)
kernel = Image.open("img/kernel_abstract.png").convert("RGB")
out = Image.new('RGB', size, 0xffffff)
quads = gcd(size[0], size[1]) ** 2
print(quads)
quad_index = 0
all_tiles = tiles(size, quads)
for tile in all_tiles:
data.append((im, kernel, out, tile, quad_index))
threads_status[quad_index] = 0
quad_index += 1
threads = []
for i in range(cpu_count() - 2):
q.put(i)
t = Thread(target=worker, args=(data,))
t.daemon = True
t.start()
threads.append(t)
while not done and sum([1 for t in threads if t.isAlive()]) > 0:
for event in pygame.event.get():
if event.type == pygame.QUIT:
done = True
screen.fill((0, 0, 0))
for k, v in threads_status.items():
tile = all_tiles[k]
pygame.draw.rect(screen, (127, 127, 127), [tile[0], tile[1], tile[2] - tile[0], 10])
pygame.draw.rect(screen, (255, 255, 255), [tile[0], tile[1], v * (tile[2] - tile[0]), 10])
pygame.display.flip()
clock.tick(60)
pygame.quit()
out.save("threaded_abstract.png")
conv_iteration()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment