Skip to content

Instantly share code, notes, and snippets.

@CodeNinjaUsman
Created March 4, 2023 05:59
import os
import sys
import time
import logging
import sqlite3
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
self.event_list = []
def on_created(self, event):
self.event_list.append({
'event_type': 'created',
'src_path': event.src_path,
'is_directory': event.is_directory
})
def on_modified(self, event):
self.event_list.append({
'event_type': 'modified',
'src_path': event.src_path,
'is_directory': event.is_directory
})
def on_moved(self, event):
self.event_list.append({
'event_type': 'moved',
'src_path': event.src_path,
'dest_path': event.dest_path,
'is_directory': event.is_directory
})
def on_deleted(self, event):
self.event_list.append({
'event_type': 'deleted',
'src_path': event.src_path,
'is_directory': event.is_directory
})
def init_database():
conn = sqlite3.connect('file_events.db')
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS file_events
(id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT,
src_path TEXT,
dest_path TEXT,
is_directory INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)
''')
conn.commit()
conn.close()
def insert_event_data(data):
conn = sqlite3.connect('file_events.db')
c = conn.cursor()
c.execute('''
INSERT INTO file_events (event_type, src_path, dest_path, is_directory)
VALUES (?, ?, ?, ?)
''', (data['event_type'], data['src_path'], data.get('dest_path', ''), data['is_directory']))
conn.commit()
conn.close()
def main(path):
event_handler = FileEventHandler()
print("Done")
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
if event_handler.event_list:
for event_data in event_handler.event_list:
insert_event_data(event_data)
event_handler.event_list.clear()
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
init_database()
path = sys.argv[1] if len(sys.argv) > 1 else '.'
main(path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment