Last active
August 29, 2015 14:05
-
-
Save jedie/3a18c659faa70bf10f36 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Queue | |
import Tkinter | |
import logging | |
import multiprocessing | |
import random | |
import threading | |
import time | |
log = multiprocessing.log_to_stderr() | |
handler = logging.StreamHandler() | |
handler.setFormatter( | |
logging.Formatter("[%(processName)s %(threadName)s] %(message)s") | |
) | |
#log.handlers = (handler,) | |
# #log.setLevel(level=99) | |
# FAKED_CPU_LOAD = 10 | |
# FAKED_CPU_LOAD = 100 | |
FAKED_CPU_LOAD = 1000 | |
# FAKED_CPU_LOAD = 10000 | |
class FakeCPU(threading.Thread): | |
def __init__(self, display_queue): | |
super(FakeCPU, self).__init__(name="FakeCPU") | |
#log.critical("FakeCPU thread init") | |
self.display_queue = display_queue | |
def run(self): | |
#log.critical("FakeCPU thread run") | |
address = 0x0400 | |
cpu_cycles = 0 | |
op_address = 0 | |
while True: | |
op_address += 1 | |
for __ in xrange(FAKED_CPU_LOAD): | |
cpu_cycles += 1 | |
if address > 0x0600: | |
op_address = 0 | |
address = 0x0400 | |
value = random.randrange(0, 255, 8) | |
data = (cpu_cycles, op_address, address, value) | |
#log.critical("new display data: %s", repr(data)) | |
try: | |
self.display_queue.put(data, block=False) | |
except Queue.Full: | |
# log.critical("display_queue is full -> put with wait") | |
self.display_queue.put(data, block=True) | |
address += 1 | |
class Dragon32TextDisplayTkinter(object): | |
""" | |
The GUI stuff | |
""" | |
CACHE = {} | |
def __init__(self, root): | |
self.rows = 32 | |
self.columns = 16 | |
self.width = 12 | |
self.height = 12 | |
self.total_width = self.width * self.rows | |
self.total_height = self.height * self.columns | |
self.canvas_image_id_map = {} | |
self.canvas = Tkinter.Canvas(root, | |
width=self.total_width, | |
height=self.total_height, | |
bd=0, # Border | |
bg="#ff0000", | |
) | |
self.img_count = 0 | |
self.next_update = time.time() + 1 | |
def write_byte(self, cpu_cycles, op_address, address, value): | |
self.img_count += 1 | |
#log.critical("create_image $%04x $%02x", address, value) | |
try: | |
img = self.CACHE[value] | |
except KeyError: | |
img = Tkinter.PhotoImage( | |
width=self.width, | |
height=self.height | |
) | |
for y in xrange(self.width): | |
for x in xrange(self.height): | |
color = "#%02x%02x%02x" % (value, value, value) | |
# #log.critical("%s %i x %i", color, x, y) | |
img.put(color, (x, y)) | |
self.CACHE[value] = img | |
position = address - 0x400 | |
column, row = divmod(position, self.rows) | |
x = self.width * row | |
y = self.height * column | |
try: | |
existing_image_id = self.canvas_image_id_map[(x, y)] | |
except KeyError: | |
image_id = self.canvas.create_image(x, y, | |
image=img, | |
state="normal", | |
anchor=Tkinter.NW # NW == NorthWest | |
) | |
self.canvas_image_id_map[(x, y)] = image_id | |
else: | |
self.canvas.itemconfigure(existing_image_id, image=img) | |
if time.time() > self.next_update: | |
log.critical("%i images/s (Cache size: %i)", self.img_count, len(self.CACHE)) | |
self.img_count = 0 | |
self.next_update = time.time() + 1 | |
class DragonTkinterGUI(object): | |
def __init__(self, display_queue): | |
self.display_queue = display_queue | |
self.root = Tkinter.Tk() | |
self.root.title("example") | |
self.display = Dragon32TextDisplayTkinter(self.root) | |
self.display.canvas.grid(row=0, column=0) | |
self.status_widget = Tkinter.Label(self.root, text="Press any key for log output\nand abort with Esc!") | |
self.status_widget.grid(row=1, column=0) | |
self.root.bind("<Key>", self.event_key_pressed) | |
self.root.update() | |
def event_key_pressed(self, event): | |
char_or_code = event.char or event.keycode | |
log.critical("keypress: %s", repr(char_or_code)) | |
if char_or_code == "\x1b": # Esc | |
log.critical("Escape!") | |
self.root.destroy() | |
def display_queue_interval(self, interval): | |
""" | |
consume all exiting "display RAM write" queue items and render them. | |
""" | |
max_time = time.time() + 0.25 | |
while True: | |
try: | |
cpu_cycles, op_address, address, value = self.display_queue.get_nowait() | |
except Queue.Empty: | |
#log.critical("display_queue empty -> exit loop") | |
# log.critical( | |
# "call display.write_byte() (display_queue._qsize(): %i)", | |
# self.display_queue._qsize() | |
# ) | |
break | |
self.display.write_byte(cpu_cycles, op_address, address, value) | |
if time.time() > max_time: | |
log.critical("Abort display_queue_interval() loop: %.4fs", (time.time() - max_time)) | |
self.root.update() | |
# break | |
self.root.after_idle(self.display_queue_interval, interval) | |
return | |
self.root.after(interval, self.display_queue_interval, interval) | |
def mainloop(self): | |
#log.critical("Start display_queue_interval()") | |
self.display_queue_interval(interval=50) | |
#log.critical("Start root.mainloop()") | |
self.root.mainloop() | |
#log.critical("root.mainloop() has quit!") | |
if __name__ == "__main__": | |
display_queue = Queue.Queue(maxsize=64) # Display RAM write outputs | |
g = DragonTkinterGUI(display_queue) | |
cpu = FakeCPU(display_queue) | |
cpu.deamon = True | |
cpu.start() | |
log.critical("mainloop start") | |
g.mainloop() | |
log.critical("mainloop end") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment