Skip to content

Instantly share code, notes, and snippets.

@Niriel
Last active January 28, 2020 14:50
Show Gist options
  • Save Niriel/34358dd85588b8cf9aa9c57b54e4d917 to your computer and use it in GitHub Desktop.
Save Niriel/34358dd85588b8cf9aa9c57b54e4d917 to your computer and use it in GitHub Desktop.
Pause/resume/load/save a long data analysis process
import datetime
import pickle
import time
import tkinter as tk
from dataclasses import dataclass
from enum import Enum
from functools import partial
from multiprocessing import Process
from multiprocessing.connection import Connection, Pipe
from pathlib import Path
from typing import List, BinaryIO
# ******************************************************************************
# ************************* ADAPT THIS TO YOUR NEEDS *************************
# ******************************************************************************
# Frozen data.
# ============
# This is your initial data, the raw data that comes from the outside world and
# does not need to change.
@dataclass
class FrozenData:
table: List[List[float]]
scale_factor: float
# Working data.
# =============
# This is the temporary data that you need during your processing. Is is the
# data that you save on the hard drive when you need to unplug your laptop.
# Do not copy the frozen data here.
@dataclass
class WorkingData:
row_id: int # The next task to accomplish.
so_far: List # Cumulated result of the analysis.
# So here I use a "List" for ``so_far``. Because that's the free monoid, the
# most general one there is, the monoid that does nothing but putting stuff
# together. It's initialized in ``init_working_data`` as ``[]``, and updated
# after each completed task by ``+`` in ``update_working_data``.
#
# You don't have to use [] and +, you can use any other monoid. Just do
# some fold/reduce on the fly if you want, that will save memory. Or just
# keep the last value. Replacing a value by another is also a monoid.
# Final data.
# ===========
# This is what you want. Maybe it's equal to the ``so_far`` of the working
# data, but if you need to do more processing in the end (like take medians
# or stuff that aren't easy to do on the fly) then FinalData will collect the
# real good stuff.
@dataclass
class FinalData:
n: int # We analyzed all that just to get ONE int out? Brilliant.
def create_mock_frozen_data() -> FrozenData:
import random
rnd = random.Random(x=0) # x is the seed, I want reproducibility
# 10000 rows of 10 columns of crap.
table = [
[rnd.random() for c in range(10)]
for r in range(10000)
]
return FrozenData(table=table, scale_factor=10.0)
# Create and update data.
# =======================
def init_working_data(_: FrozenData) -> WorkingData:
# Maybe creating the working data requires info from the frozen data.
# I can't think of why, but you've got the option.
return WorkingData(
row_id=0, # Next task to perform.
so_far=[] # Monoid identity.
)
def update_working_data(fd: FrozenData, wd: WorkingData) -> WorkingData:
# Find out what our next task is.
row_id = wd.row_id
# Get a handle on the frozen data that's relevant for this task.
row = fd.table[row_id]
# Do the actual analysis.
row_result = (
# Throw in a time stamp, just in case for performance analysis later.
# Also would reveal if we've paused the processing.
# Yes it's not pure, sue me.
datetime.datetime.utcnow(),
# Here comes the serious analysis :D.
len(row),
min(row) * fd.scale_factor,
max(row) * fd.scale_factor,
)
# Pretend it's hard so you have time to test the command line.
time.sleep(0.1) # TODO PLEASE REMOVE THAT
# Merge the result for this task with the previous results.
return WorkingData(
row_id=row_id + 1,
so_far=wd.so_far + [row_result] # Monoid operation.
)
def create_final_data(wd: WorkingData) -> FinalData:
# I hope you want to do more with your data than just counting
# how many rows it had.
return FinalData(len(wd.so_far))
# ******************************************************************************
# ***************************** COMMON MACHINERY *****************************
# ******************************************************************************
# Save and load Working and Final data.
# =====================================
def data_save(f: BinaryIO, obj):
pickle.dump(obj, f)
def data_load(f: BinaryIO):
return pickle.load(f)
class SlaveState(Enum):
WORKING = 'working'
RESTING = 'resting'
class Commands(Enum):
WORK = 'work'
REST = 'rest'
LOAD = 'load'
SAVE = 'save'
ABORT = 'abort'
REPORT = 'report'
# Slave process: the process that does the actual work.
# =====================================================
SLEEP_PERIOD = 0.1 # second.
def slave_loop(final_save_path: Path,
tmp_save_path: Path,
commands: Connection,
progress: Connection,
fd: FrozenData,
wd: WorkingData):
state = SlaveState.RESTING
while True:
# Listen to commands and obey them.
while commands.poll():
cmd = commands.recv()
if cmd == Commands.WORK:
state = SlaveState.WORKING
elif cmd == Commands.REST:
state = SlaveState.RESTING
elif cmd == Commands.SAVE:
with tmp_save_path.open('wb') as f:
data_save(f, wd)
elif cmd == Commands.LOAD:
with tmp_save_path.open('rb') as f:
wd = data_load(f)
elif cmd == Commands.REPORT:
progress.send((state, wd.row_id, len(fd.table)))
elif cmd == Commands.ABORT:
return
else:
raise ValueError(cmd)
# Perform the action for the current state.
if state == SlaveState.WORKING:
if wd.row_id >= len(fd.table):
final_data = create_final_data(wd) # <=== Actual work.
with final_save_path.open('wb') as f:
data_save(f, final_data)
progress.send((state, wd.row_id, len(fd.table)))
progress.send('done')
return
wd = update_working_data(fd, wd) # <=== Actual work.
elif state == SlaveState.RESTING:
# Don't hog CPU.
time.sleep(SLEEP_PERIOD)
else:
raise ValueError(state)
# Master process: the process that controls the slave process.
# ============================================================
parse_command = {
# There probably is a better way to transpose Commands.
'work': Commands.WORK,
'rest': Commands.REST,
'save': Commands.SAVE,
'load': Commands.LOAD,
'abort': Commands.ABORT,
}
def master_loop_command_line(commands: Connection, progress: Connection):
# So we're going to control the slave with the most terrible command line
# in the universe.
# What happens if the slave has finished its job, or crashed, and isn't
# replying anymore? I guess it's a surprise.
# Print the user manual.
print('Valid commands: {}.'.format(', '.join(parse_command)))
while True:
commands.send(Commands.REPORT)
progress_reported = progress.recv() # Wait for at least one report.
while progress.poll():
# If more reports, flush them all.
progress_reported = progress.recv() # Wait for at least one report.
if progress_reported == 'done':
print('Slave is done')
return
print('Progress: ', progress_reported)
s = input('> ')
cmd = parse_command.get(s, None)
if cmd:
commands.send(cmd)
if cmd == Commands.ABORT:
return
elif s: # If user just presses Enter, don't scream.
print('Invalid command.')
print('Valid commands: {}.'.format(', '.join(parse_command)))
def master_loop_gui(commands: Connection, progress: Connection):
log_period = 1000 # millisecond
def send_command(s):
line = f'{datetime.datetime.now()}: command: {s}\n'
log(line)
cmd = parse_command[s]
commands.send(cmd)
if cmd == Commands.ABORT:
root.destroy()
root = tk.Tk()
parent = root
# Command buttons.
frame = tk.Frame(parent)
frame.pack(fill=tk.X, expand=False)
parent = frame
row = 0
for column, text in enumerate(parse_command):
b = tk.Button(parent, text=text, command=partial(send_command, text))
b.grid(row=row, column=column, sticky=tk.NSEW)
parent.columnconfigure(index=column, weight=1, uniform='whatev')
row += 1
parent = root
# Status log.
frame = tk.Frame(parent)
frame.pack(fill=tk.BOTH, expand=True)
parent = frame
scroll_bar = tk.Scrollbar(parent)
scroll_bar.pack(side=tk.RIGHT, fill=tk.Y, expand=False)
status_log = tk.Text(parent, yscrollcommand=scroll_bar.set)
status_log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scroll_bar.config(command=status_log.yview)
def log(txt):
status_log.insert(tk.END, txt)
if scroll_bar.get()[1] == 1.0:
# Auto scroll down if the scrollbar is already at the end.
# Otherwise don't: let the user look at the log.
status_log.see(tk.END)
def show_status():
commands.send(Commands.REPORT)
progress_reported = progress.recv() # Wait for at least one report.
while progress.poll():
# If more reports, flush them all.
progress_reported = progress.recv() # Wait for at least one report.
line = f'{datetime.datetime.now()}: status: {progress_reported}\n'
log(line)
root.after(log_period, show_status)
def on_closing():
send_command('abort')
show_status()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
# Start the whole thing.
# ======================
def _main():
# Configure.
final_save_path = Path('final.data')
tmp_save_path = Path('tmp.data')
# Load the frozen data.
fd = create_mock_frozen_data()
wd = init_working_data(fd)
master_commands, slave_commands = Pipe()
master_progress, slave_progress = Pipe()
slave = Process(
target=slave_loop,
args=(
final_save_path,
tmp_save_path,
slave_commands,
slave_progress,
fd,
wd)
)
slave.start()
# master_loop_command_line(master_commands, master_progress)
master_loop_gui(master_commands, master_progress)
slave.join()
try:
with final_save_path.open('rb') as f:
final = data_load(f)
print('Final result:')
print(final)
except FileNotFoundError:
print('Work is not finished')
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment