Skip to content

Instantly share code, notes, and snippets.

@tef
Last active April 17, 2018 02:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tef/116aa482adc46877a49074005fa8adf6 to your computer and use it in GitHub Desktop.
Save tef/116aa482adc46877a49074005fa8adf6 to your computer and use it in GitHub Desktop.

undo/redo persistent example

$ ./undo.py do 1
do: 1

$ ./undo.py do 2
do: 2

$ ./undo.py do 3
do: 3

$ ./undo.py do 4
do: 4

$ ./undo.py log
log: 2018-04-17 02:04:03.300630+00:00 4
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1

$ ./undo.py undo
undo: 4

$ ./undo.py log
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1

$ ./undo.py choices
choice:0: 4

$ ./undo.py do ABC
do: ABC

$ ./undo.py undo
undo: ABC

$ ./undo.py choices
choice:0: 4
choice:1: ABC

$ ./undo.py redo 
redo: ABC

$ ./undo.py undo
undo: ABC

$ ./undo.py redo 0
redo: 4

$ ./undo.py log
log: 2018-04-17 02:04:03.300630+00:00 4
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1
#!/usr/bin/env python3
import sqlite3
import sys
from contextlib import contextmanager
from datetime import datetime, timezone
from uuid import uuid4
def UUID():
return str(uuid4())
def NOW():
return datetime.now(timezone.utc)
class History:
START = '__start__'
def __init__(self, db, session):
self.db = db
self.session = session
def ready(self):
return self.current() == self.active()
def setup(self):
c = self.db.cursor()
# a table of actions performed, with a link to the last action key
c.execute("""CREATE TABLE IF NOT EXISTS dos (key, prev, timestamp, action)""")
# a table of the actions that can follow on (redo)
c.execute("""CREATE TABLE IF NOT EXISTS redos (key, dos)""")
# current, active point to key in dos table
# current being last executed
# active being currently executed
c.execute("""CREATE TABLE IF NOT EXISTS sessions (name, current, active )""")
self.db.commit()
if self.current() is None:
c.execute("""insert into sessions (name, current, active) values (?, ?,?)""", (self.session, self.START, self.START,))
self.db.commit()
def current(self):
"""Get the last Do Record Key"""
c = self.db.cursor()
c.execute('select current from sessions where name = ?' , (self.session,))
buf = c.fetchone()
if buf:
return str(buf[0])
def active(self):
"""Get the do record that is happening now"""
c = self.db.cursor()
c.execute('select active from sessions where name = ?' , (self.session,))
buf = c.fetchone()
if buf:
return str(buf[0])
def set_current(self, key):
"""Set the current do record"""
c = self.db.cursor()
c.execute("""update sessions set current = ? where name = ?""", (key, self.session,))
self.db.commit()
def set_active(self, key):
"""Set which do record is executing"""
c = self.db.cursor()
c.execute("""update sessions set active = ? where name = ?""", (key, self.session,))
self.db.commit()
def get_do(self, key):
"""get do row from table"""
c = self.db.cursor()
c.execute('select prev, timestamp, action from dos where key = ?' , (key,))
buf = c.fetchone()
if buf:
return str(buf[0]), str(buf[1]), str(buf[2])
def get_redos(self, key):
"""get redo row from table,spliting dos column"""
c = self.db.cursor()
c.execute('select dos from redos where key = ?' , (key,))
buf = c.fetchone()
if buf:
return str(buf[0]).split(",")
return []
def set_redos(self, key, dos):
c = self.db.cursor()
dos = ",".join(dos)
if self.get_redos(key):
c.execute("""update redos set dos=? where key=?""", (dos,key))
else:
c.execute("""insert INTO redos (key, dos) values (?,?)""", (key,dos))
self.db.commit()
def add_do(self, prev, action):
c = self.db.cursor()
key = self.key_for(action)
timestamp = NOW()
c.execute("""INSERT INTO dos (key, prev, timestamp, action) values (?, ?,?,?)""", (key, prev, timestamp, action,))
self.db.commit()
return key
def key_for(self, action):
return UUID()
def log(self):
current = self.current()
while current != self.START:
prev, ts, action = self.get_do(current)
yield ts, action
current = prev
def choices(self):
out = []
for do in self.get_redos(self.current()):
prev, ts, action = self.get_do(do)
out.append(action)
return out
@contextmanager
def do(self, action):
if not self.ready(): raise Exception('Unclean log')
key = self.add_do(self.active(), action)
self.set_active(key)
yield action
self.set_current(key)
@contextmanager
def undo(self):
if not self.ready(): raise Exception('Unclean log')
current = self.current()
if current == self.START:
yield None
else:
prev, timestamp, action = self.get_do(current)
redos = self.get_redos(prev)
redos.append(current)
self.set_active(prev)
yield action
self.set_redos(prev, redos)
self.set_current(prev)
@contextmanager
def redo(self, choice=None):
if not self.ready(): raise Exception('Unclean log')
current = self.current()
redos = self.get_redos(current)
if not redos:
yield None
else:
choice = -1 if choice is None else choice
next = redos.pop(choice)
prev, timestamp, action = self.get_do(next)
self.set_active(next)
yield action
self.set_redos(current, redos)
self.set_current(next)
@contextmanager
def rollback_active(self):
if self.ready():
yield None
else:
active = self.active()
prev, ts, action = self.get_do(active)
if prev != self.current():
raise Exception('disconnect')
yield action
self.set_active(self.current())
@contextmanager
def restart_active(self):
if self.ready():
yield None
else:
active = self.active()
prev, ts, action = self.get_do(active)
if prev != self.current():
raise Exception('disconnect')
yield action
self.set_current(active)
with sqlite3.connect('undo.db') as db:
history = History(db, 'main')
history.setup()
argv = sys.argv[1:]
if argv and argv[0] == 'do':
action = " ".join(argv[1:])
with history.do(action) as action:
print('do:', action)
elif argv and argv[0] == 'undo':
with history.undo() as action:
print('undo:', action)
elif argv and argv[0] == 'redo':
choice = None if len(argv) < 2 else int(argv[1])
with history.redo(choice) as action:
print('redo:', action)
elif argv and argv[0] == 'choices':
for n, choice in enumerate(history.choices()):
print('choice:{}:'.format(n),choice)
elif argv and argv[0] == 'log':
for timestamp, entry in history.log():
print('log:', timestamp, entry)
else:
print("{}: do|undo|redo|choices|log".format(sys.argv[0]))
sys.exit(-1)
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment