$ ./undo.py do 1
do: 1
$ ./undo.py do 2
do: 2
$ ./undo.py do 3
do: 3
$ ./undo.py do 4
do: 4
$ ./undo.py log
log: 2018-04-17 02:04:03.300630+00:00 4
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1
$ ./undo.py undo
undo: 4
$ ./undo.py log
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1
$ ./undo.py choices
choice:0: 4
$ ./undo.py do ABC
do: ABC
$ ./undo.py undo
undo: ABC
$ ./undo.py choices
choice:0: 4
choice:1: ABC
$ ./undo.py redo
redo: ABC
$ ./undo.py undo
undo: ABC
$ ./undo.py redo 0
redo: 4
$ ./undo.py log
log: 2018-04-17 02:04:03.300630+00:00 4
log: 2018-04-17 02:04:02.295850+00:00 3
log: 2018-04-17 02:04:01.267907+00:00 2
log: 2018-04-17 02:04:00.022608+00:00 1
Last active
April 17, 2018 02:45
-
-
Save tef/116aa482adc46877a49074005fa8adf6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sqlite3 | |
import sys | |
from contextlib import contextmanager | |
from datetime import datetime, timezone | |
from uuid import uuid4 | |
def UUID(): | |
return str(uuid4()) | |
def NOW(): | |
return datetime.now(timezone.utc) | |
class History: | |
START = '__start__' | |
def __init__(self, db, session): | |
self.db = db | |
self.session = session | |
def ready(self): | |
return self.current() == self.active() | |
def setup(self): | |
c = self.db.cursor() | |
# a table of actions performed, with a link to the last action key | |
c.execute("""CREATE TABLE IF NOT EXISTS dos (key, prev, timestamp, action)""") | |
# a table of the actions that can follow on (redo) | |
c.execute("""CREATE TABLE IF NOT EXISTS redos (key, dos)""") | |
# current, active point to key in dos table | |
# current being last executed | |
# active being currently executed | |
c.execute("""CREATE TABLE IF NOT EXISTS sessions (name, current, active )""") | |
self.db.commit() | |
if self.current() is None: | |
c.execute("""insert into sessions (name, current, active) values (?, ?,?)""", (self.session, self.START, self.START,)) | |
self.db.commit() | |
def current(self): | |
"""Get the last Do Record Key""" | |
c = self.db.cursor() | |
c.execute('select current from sessions where name = ?' , (self.session,)) | |
buf = c.fetchone() | |
if buf: | |
return str(buf[0]) | |
def active(self): | |
"""Get the do record that is happening now""" | |
c = self.db.cursor() | |
c.execute('select active from sessions where name = ?' , (self.session,)) | |
buf = c.fetchone() | |
if buf: | |
return str(buf[0]) | |
def set_current(self, key): | |
"""Set the current do record""" | |
c = self.db.cursor() | |
c.execute("""update sessions set current = ? where name = ?""", (key, self.session,)) | |
self.db.commit() | |
def set_active(self, key): | |
"""Set which do record is executing""" | |
c = self.db.cursor() | |
c.execute("""update sessions set active = ? where name = ?""", (key, self.session,)) | |
self.db.commit() | |
def get_do(self, key): | |
"""get do row from table""" | |
c = self.db.cursor() | |
c.execute('select prev, timestamp, action from dos where key = ?' , (key,)) | |
buf = c.fetchone() | |
if buf: | |
return str(buf[0]), str(buf[1]), str(buf[2]) | |
def get_redos(self, key): | |
"""get redo row from table,spliting dos column""" | |
c = self.db.cursor() | |
c.execute('select dos from redos where key = ?' , (key,)) | |
buf = c.fetchone() | |
if buf: | |
return str(buf[0]).split(",") | |
return [] | |
def set_redos(self, key, dos): | |
c = self.db.cursor() | |
dos = ",".join(dos) | |
if self.get_redos(key): | |
c.execute("""update redos set dos=? where key=?""", (dos,key)) | |
else: | |
c.execute("""insert INTO redos (key, dos) values (?,?)""", (key,dos)) | |
self.db.commit() | |
def add_do(self, prev, action): | |
c = self.db.cursor() | |
key = self.key_for(action) | |
timestamp = NOW() | |
c.execute("""INSERT INTO dos (key, prev, timestamp, action) values (?, ?,?,?)""", (key, prev, timestamp, action,)) | |
self.db.commit() | |
return key | |
def key_for(self, action): | |
return UUID() | |
def log(self): | |
current = self.current() | |
while current != self.START: | |
prev, ts, action = self.get_do(current) | |
yield ts, action | |
current = prev | |
def choices(self): | |
out = [] | |
for do in self.get_redos(self.current()): | |
prev, ts, action = self.get_do(do) | |
out.append(action) | |
return out | |
@contextmanager | |
def do(self, action): | |
if not self.ready(): raise Exception('Unclean log') | |
key = self.add_do(self.active(), action) | |
self.set_active(key) | |
yield action | |
self.set_current(key) | |
@contextmanager | |
def undo(self): | |
if not self.ready(): raise Exception('Unclean log') | |
current = self.current() | |
if current == self.START: | |
yield None | |
else: | |
prev, timestamp, action = self.get_do(current) | |
redos = self.get_redos(prev) | |
redos.append(current) | |
self.set_active(prev) | |
yield action | |
self.set_redos(prev, redos) | |
self.set_current(prev) | |
@contextmanager | |
def redo(self, choice=None): | |
if not self.ready(): raise Exception('Unclean log') | |
current = self.current() | |
redos = self.get_redos(current) | |
if not redos: | |
yield None | |
else: | |
choice = -1 if choice is None else choice | |
next = redos.pop(choice) | |
prev, timestamp, action = self.get_do(next) | |
self.set_active(next) | |
yield action | |
self.set_redos(current, redos) | |
self.set_current(next) | |
@contextmanager | |
def rollback_active(self): | |
if self.ready(): | |
yield None | |
else: | |
active = self.active() | |
prev, ts, action = self.get_do(active) | |
if prev != self.current(): | |
raise Exception('disconnect') | |
yield action | |
self.set_active(self.current()) | |
@contextmanager | |
def restart_active(self): | |
if self.ready(): | |
yield None | |
else: | |
active = self.active() | |
prev, ts, action = self.get_do(active) | |
if prev != self.current(): | |
raise Exception('disconnect') | |
yield action | |
self.set_current(active) | |
with sqlite3.connect('undo.db') as db: | |
history = History(db, 'main') | |
history.setup() | |
argv = sys.argv[1:] | |
if argv and argv[0] == 'do': | |
action = " ".join(argv[1:]) | |
with history.do(action) as action: | |
print('do:', action) | |
elif argv and argv[0] == 'undo': | |
with history.undo() as action: | |
print('undo:', action) | |
elif argv and argv[0] == 'redo': | |
choice = None if len(argv) < 2 else int(argv[1]) | |
with history.redo(choice) as action: | |
print('redo:', action) | |
elif argv and argv[0] == 'choices': | |
for n, choice in enumerate(history.choices()): | |
print('choice:{}:'.format(n),choice) | |
elif argv and argv[0] == 'log': | |
for timestamp, entry in history.log(): | |
print('log:', timestamp, entry) | |
else: | |
print("{}: do|undo|redo|choices|log".format(sys.argv[0])) | |
sys.exit(-1) | |
sys.exit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment