Skip to content

Instantly share code, notes, and snippets.

@CodeZombie
Created December 1, 2023 18:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CodeZombie/d1fb7212266c62ebca3ad1149d6b89d1 to your computer and use it in GitHub Desktop.
Save CodeZombie/d1fb7212266c62ebca3ad1149d6b89d1 to your computer and use it in GitHub Desktop.
Given a huge directory a files, this script will search the content of each file, and copy it to a new directory if it contains a substring. Threaded.
import os
import threading
import shutil
from collections import namedtuple
ThreadWithData = namedtuple('thread', 'thread, filecount')
# Constants
MAX_THREADS = 12
FILES_PER_THREAD = 500
DIRECTORY = "./files_to_search"
OUTPUT_DIRECTORY = "./output"
SEARCH_STRING = "needle"
def get_files_in_directory(directory):
files = []
for filename in os.listdir(directory):
if os.path.isfile(os.path.join(directory, filename)):
files.append(filename)
return files
def move_files_if_they_contain_string(files):
for file in files:
with open(os.path.join(DIRECTORY, file), encoding="utf-8", errors="ignore") as f:
if SEARCH_STRING.lower() in f.read().lower():
print("Found \"" + SEARCH_STRING + "\" in " + file)
shutil.copyfile(os.path.join(DIRECTORY, file), os.path.join(OUTPUT_DIRECTORY, file))
def main():
print("Starting...")
files = get_files_in_directory(DIRECTORY)
print("found " + str(len(files)) + " files.")
threads = []
analyzed_files = 0
while len(files) > 0 or len(threads) > 0:
finished_threads = [thread for thread in threads if not thread.thread.is_alive()]
finished_thread_count = sum([thread.filecount for thread in finished_threads])
analyzed_files += finished_thread_count
if finished_thread_count > 0:
print("Analyzed " + str(analyzed_files) + " files.")
threads = [thread for thread in threads if thread not in finished_threads]
if len(threads) < MAX_THREADS and len(files) > 0:
# Grab a chunk of files to be checked in a new thread
files_to_check = files[:FILES_PER_THREAD] if len(files) > FILES_PER_THREAD else files
# Remove the files we just checked from the master list
files = files[len(files_to_check):]
t = threading.Thread(target=move_files_if_they_contain_string, args=(files_to_check,))
t.start()
threads.append(ThreadWithData(thread=t, filecount=len(files_to_check)))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment