Skip to content

Instantly share code, notes, and snippets.

@yxqsnz
Last active December 30, 2022 12:20
Show Gist options
  • Save yxqsnz/0a7b32a0c30ba77884889bfd04fdbffc to your computer and use it in GitHub Desktop.
Save yxqsnz/0a7b32a0c30ba77884889bfd04fdbffc to your computer and use it in GitHub Desktop.
#!/bin/python3
from sqlite3 import Cursor, connect
from sys import argv
from os import path, walk
from magic import Magic
from pyzstd import compress_stream
from io import BytesIO
magic = Magic()
mime = Magic(mime=True)
def guess(path: str):
return magic.from_file(path), mime.from_file(path)
def process(cursor: Cursor, source_dir: str, compression_level: int):
for root, _, files in walk(source_dir):
for file in files:
file_path = path.join(root, file)
print(f"[*] Found {file_path}")
kind, mime = guess(file_path)
print(f" [*] Which is: {kind} and {mime}")
with open(file_path, "rb") as input:
print(" [*] Compressing ...")
with BytesIO() as bo:
compress_stream(input, bo, level_or_option=compression_level)
cursor.execute(
"INSERT OR REPLACE INTO file VALUES (?, ?, ?, ?)",
(file_path, mime, kind, bo.getvalue()),
)
print(" [*] Done!")
def main(args):
if len(args) < 2:
print(
"Usage: folder-to-sqlite.py <SOURCE FOLDER> <TARGET sqlite3> [Zstd compression level]"
)
return
print("[*] Starting ...")
(
source,
target,
) = args[:2]
zlv = 1
if len(args) > 2:
zlv = int(args[2])
if not path.exists(source):
print("Please tell me a valid path!")
conn = connect(target)
cursor = conn.cursor()
try:
cursor.execute(
"CREATE TABLE IF NOT EXISTS file (name TEXT, mime TEXT, kind TEXT, data BLOB)"
)
process(cursor, source, zlv)
finally:
print("[*] Flushing and closing ...")
cursor.close()
conn.commit()
conn.close()
if __name__ == "__main__":
args = argv[1:]
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment