Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''Detects renamed and/or moved files by tracking inodes.
Creates a shell script to replay similar changes. Make sure to use relative
paths if you want to replay changes in a different absolute location. Does not
follow symbolic links. Inode numbers must be identical (do not cross
filesystems)!
'''
__author__ = 'Pavel Krc'
__email__ = 'src@pkrc.net'
__version__ = '1.1'
__copyright__ = 'Copyright (C) 2015 Pavel Krc'
__license__ = 'GPLv2+'
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import os
import re
generate_python_scripts = True # instead of shell scripts (more reliable)
if generate_python_scripts:
script_header = '''#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
def ren(a, b):
print(b)
assert(not os.path.exists(b))
os.rename(a, b)
def mkd(d):
print(d)
os.mkdir(d)
'''
mv_cmd = 'ren("{0}", "{1}")\n'
mkdir_cmd = 'mkd("{0}")\n'
escaped_chars = re.compile(r'(["\\])')
esc = lambda s: escaped_chars.sub(r'\\\1', s)
else:
script_header = '#!/bin/sh\nset -e -v\n\n'
mv_cmd = 'mv -n -T -- "{0}" "{1}"\n'
mkdir_cmd = 'mkdir -- "{0}"\n'
# Since single quotes cannot be escaped in sh, I have to use double qoutes
# and do a little bit more escaping. Fortunately regex does it efficiently.
escaped_chars = re.compile(r'([`"\$\\])')
esc = lambda s: escaped_chars.sub(r'\\\1', s)
def dump_inodes(root, log_path):
# must be top-down for reconstruction
with open(log_path, 'w') as o:
o.write('D {0:d} {1}\n'.format(os.lstat(root).st_ino, root))
for dpath, dnames, fnames in os.walk(root):
dpath += '/'
for n in dnames:
p = dpath + n
o.write('D {0:d} {1}\n'.format(os.lstat(p).st_ino, p))
for n in fnames:
p = dpath + n
o.write('F {0:d} {1}\n'.format(os.lstat(p).st_ino, p))
class DirEntry(object):
__slots__ = ['path', 'parent', 'dirs', 'files']
def __init__(self, path, parent):
self.path = path
self.parent = parent
self.dirs = set()
self.files = set()
class FileEntry(object):
__slots__ = ['path', 'parent']
def __init__(self, path, parent):
self.path = path
self.parent = parent
class MovingTree(object):
def __init__(self, log_path):
self.dirs = {}
self.files = {}
revdirs = {}
with open(log_path) as i:
# root entry
df, ino, path = next(i).rstrip('\n').split(' ', 2)
ino = int(ino)
assert df == 'D'
self.root = path
self.dirs[ino] = DirEntry(path, None)
revdirs[path] = ino
for ln in i:
df, ino, path = ln.rstrip('\n').split(' ', 2)
ino = int(ino)
parent_ino = revdirs[path.rsplit('/', 1)[0]]
if df == 'D':
self.dirs[ino] = DirEntry(path, parent_ino)
revdirs[path] = ino
self.dirs[parent_ino].dirs.add(ino)
elif df == 'F':
self.files[ino] = FileEntry(path, parent_ino)
self.dirs[parent_ino].files.add(ino)
else:
raise RuntimeError()
def create_script(self, script_path):
# uses os.open to create executable script - still, read it first!
cls = lambda: None
try:
fd = os.open(script_path, os.O_CREAT|os.O_WRONLY|os.O_TRUNC, 0o777)
cls = lambda: os.close(fd)
o = os.fdopen(fd, 'w')
cls = o.close
o.write(script_header)
self.detect_changes(o)
finally:
cls()
def update_children(self, entry, orig_p, new_p):
l = len(orig_p)
for i in entry.dirs:
centry = self.dirs[i]
assert centry.path[:l] == orig_p
centry.path = new_p + centry.path[l:]
self.update_children(centry, orig_p, new_p)
for i in entry.files:
centry = self.files[i]
assert centry.path[:l] == orig_p
centry.path = new_p + centry.path[l:]
def detect_changes(self, script):
# The order of detecting changes is important. The safest order I could
# think of was to start top-bottom according to destination (i.e.
# safely constructing new state with guaranteed existing parents),
# updating source data structures where necessary.
newfiles = []
ok_dirs = ok_files = 0
for dpath, dnames, fnames in os.walk(self.root):
dpath += '/'
for n in dnames:
p = dpath + n
ino = os.lstat(p).st_ino
try:
orig_entry = self.dirs.pop(ino)
except KeyError:
# new directory
script.write(mkdir_cmd.format(esc(p)))
else:
# existing directory
if orig_entry.path == p:
ok_dirs += 1
else:
# moved
script.write(mv_cmd.format(esc(orig_entry.path), esc(p)))
# disparent self
try:
parent_entry = self.dirs[orig_entry.parent]
except KeyError:
pass #parent already processed
else:
parent_entry.dirs.remove(ino)
# moving under either freshly created or already
# processed dir, so no need to register under new
# parent.
# update all children in the source tree
self.update_children(orig_entry, orig_entry.path+'/', p+'/')
for n in fnames:
p = dpath + n
ino = os.lstat(p).st_ino
try:
orig_entry = self.files.pop(ino)
except KeyError:
# new file - just log
newfiles.append(p)
else:
# existing file
if orig_entry.path == p:
ok_files += 1
else:
# moved
script.write(mv_cmd.format(esc(orig_entry.path), esc(p)))
# disparent self
try:
parent_entry = self.dirs[orig_entry.parent]
except KeyError:
pass #parent already processed
else:
parent_entry.files.remove(ino)
# list remaining unprocessed
script.write('\n### Deleted directories ###\n')
for p in sorted(e.path for e in self.dirs.values()
if e.path != self.root):
script.write('#{0}\n'.format(p))
script.write('\n### Deleted files ###\n')
for p in sorted(e.path for e in self.files.values()):
script.write('#{0}\n'.format(p))
script.write('\n### Newly created files ###\n')
for p in newfiles:
script.write('#{0}\n'.format(p))
script.write('\n### {0:d} dirs and {1:d} files have remained in place. ###\n'
.format(ok_dirs, ok_files))
if __name__ == '__main__':
action = sys.argv[1:2]
if action == ['dump']:
dump_inodes(sys.argv[2], sys.argv[3])
elif action == ['detect']:
tr = MovingTree(sys.argv[2])
tr.create_script(sys.argv[3])
else:
sys.exit('''Usage:
{0} dump {{root_path}} {{inode_list_path}}
Dumps inode numbers inside {{root_path}} to a new
file {{inode_list_path}}, thus recording current state.
{0} detect {{inode_list_path}} {{script_path}}
Compares recorded state within {{inode_list_path}} with current
state and creates a script to reconstruct detected changes.
'''.format(sys.argv[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.