Skip to content

Instantly share code, notes, and snippets.

@johnscillieri
Created July 21, 2019 01:02
Show Gist options
  • Save johnscillieri/f5268e788412ae99ec6a6645dc9bda7a to your computer and use it in GitHub Desktop.
Save johnscillieri/f5268e788412ae99ec6a6645dc9bda7a to your computer and use it in GitHub Desktop.
"""Create a SQLite database from the filesystem
Usage:
create_database.py [options] <path>
create_database.py (-h | --help)
create_database.py (-v | --version)
Options:
-l --list List the contents of the database
-h --help Show this screen.
-v --version Show version.
To Do:
* This doesn't account for time collected vs time on the machine
* Account for Unix attributes like owner, group, permissions
* Normalize Windows paths
"""
import time
from datetime import datetime
from pathlib import Path
from docopt import docopt
from loguru import logger
from peewee import CharField, DateField, IntegerField, Model, SqliteDatabase
################################################################################
DATABASE = SqliteDatabase("filesystem.db")
################################################################################
class Record(Model):
""" Store a record of a file/directory on the filesystem """
name = CharField()
path = CharField()
depth = IntegerField()
size = IntegerField()
record_type = CharField()
created = DateField()
modified = DateField()
accessed = DateField()
target = CharField()
class Meta:
""" Used to associate the record to the specified database """
database = DATABASE
# Create a unique constraint on the name/path pair
indexes = ((("name", "path"), True),) # Note the trailing comma!
def __str__(self):
""" Give an ls-like output when printing a Record """
symlink_target = f" -> {self.target}" if self.record_type == "symlink" else ""
return f"{self.size:>7d} {self.modified} {self.path}/{self.name}{symlink_target}"
################################################################################
def main(args):
""" Update the sqlite database with the file system path provided """
start = datetime.now()
logger.success(f"Inserting records for path: {args.path}")
DATABASE.connect()
DATABASE.create_tables([Record])
with DATABASE.atomic():
for dict_record in get_directory_information(args.path):
Record.insert(**dict_record).on_conflict_replace().execute()
if args.list:
for record in Record.select():
print(record)
DATABASE.close()
logger.success(f"Done. ({datetime.now()-start})")
def get_directory_information(directory):
""" Generator used to recurse the file system and yield Records for each item """
prior_parent = None
for name in Path(directory).rglob("*"):
parent = name.parent.resolve()
if parent != prior_parent:
logger.info(f"Now in path: {parent}")
try:
record_info = name.lstat()
except FileNotFoundError as fnfe:
logger.error(f"stat() FileNotFoundError - {fnfe} - Skipping!")
continue
try:
record_type = "file" if name.is_file() else "symlink" if name.is_symlink() else "directory"
except PermissionError as exc:
logger.error(f"PermissionError - {exc} - Setting record_type to 'unknown'!")
record_type = "unknown"
symlink_target = bytes(name.resolve()).decode("utf-8") if record_type == "symlink" else ""
yield {
"name": name.name,
"path": bytes(parent).decode("utf-8"),
"depth": len(parent.parts),
"size": record_info.st_size,
"record_type": record_type,
"created": time.ctime(record_info.st_ctime),
"modified": time.ctime(record_info.st_mtime),
"accessed": time.ctime(record_info.st_atime),
"target": symlink_target,
}
prior_parent = parent
if __name__ == "__main__":
main(docopt(__doc__, version="1.0"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment