Skip to content

Instantly share code, notes, and snippets.

@ChaoticMind
Created November 27, 2023 19:25
Show Gist options
  • Save ChaoticMind/3a0683038c156904698f0455271149cc to your computer and use it in GitHub Desktop.
Save ChaoticMind/3a0683038c156904698f0455271149cc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import subprocess
from twisted.internet import reactor, inotify, stdio
from twisted.protocols import basic
from twisted.python.filepath import FilePath
class overriddenInotify(inotify.INotify):
def _rmWatch(self, wd):
print("called _rmWatch...")
super()._rmWatch(wd)
def connectionLost(self, reason):
print("closing inotify file descriptor")
super().connectionLost(reason)
class stdinProtocol(basic.LineReceiver):
delimiter = b"\n"
directory0 = "/tmp/deleteme0"
directory1 = "/tmp/deleteme1"
move_dst = "/tmp/deleteme2"
_path0 = None
_path1 = None
_notifier = None
def __init__(self):
self._path0 = FilePath(self.directory0)
self._path1 = FilePath(self.directory1)
# self._init_notifier()
def create_or_recreate_if_exists(self):
if self._path0.exists():
print(f"Removing directory: {self.directory0}")
self._path0.remove()
if self._path1.exists():
print(f"Removing directory: {self.directory1}")
self._path1.remove()
print(f"Creating directory: {self.directory0}")
print(f"Creating directory: {self.directory1}")
self._path0.makedirs()
self._path1.makedirs()
def connectionMade(self):
self.sendLine(f"[pid: {os.getpid()}] Connected to stdin...")
self.lineReceived(b"")
def sendLine(self, line):
super().sendLine(str(line).encode())
def _init_notifier(self):
if self._notifier:
# self._notifier.loseConnection()
pass
else:
self._notifier = overriddenInotify()
self._notifier.startReading()
def _register_notifier(self):
self.create_or_recreate_if_exists()
self._init_notifier()
mask = (
inotify.IN_WATCH_MASK
) # inotify.IN_CHANGED | inotify.IN_CREATE
mask ^= inotify.IN_DELETE_SELF # exclude IN_DELETE_SELF events
self.sendLine(f"Registering notifier on {self._path0.path}")
self.sendLine(f"Registering notifier on {self._path1.path}")
try:
self._notifier.watch(
self._path0,
mask=mask,
callbacks=[self._on_dir_modified],
)
self._notifier.watch(
self._path1,
mask=mask,
callbacks=[self._on_dir_modified],
)
except inotify.INotifyError as e:
self.sendLine(
"Couldn't register inotify watcher, library raised an "
f"exception: {e})"
)
self._notifier.loseConnection()
self.transport.loseConnection()
self.sendLine("")
def _on_dir_modified(self, ignored, filepath, mask):
self.sendLine(
f"tracked_dir_modified: {ignored} - {filepath.path} - "
f"{inotify.humanReadableMask(mask)}"
)
def monkey_patch():
if mask == inotify.IN_DELETE_SELF:
# twisted will, in this case, forcibly call _rmWatch()
# (equivalent to .ignore()), but doesn't call .loseConnection()
self._notifier.loseConnection()
self._notifier = None
# self.sendLine(ignored)
# self.sendLine(filepath.path)
# self.sendLine(inotify.humanReadableMask(mask))
# monkey_patch()
self.sendLine("")
def _unregister_notifier(self):
if self._notifier:
self.sendLine(f"Unregistering {self._path0}")
self.sendLine(f"Unregistering {self._path1}")
self._notifier.ignore(self._path0)
self._notifier.ignore(self._path1)
self._notifier.loseConnection()
self._notifier = None
else:
self.sendLine("Nothing registered, doing nothing...")
self.sendLine("")
def _move_dir(self):
if self._path.exists():
self._path.moveTo(FilePath(self.move_dst))
else:
self.sendLine(f'No dir "{self._path.path}" to move')
self.sendLine("")
def _remove_dir(self):
if self._path.exists():
self.sendLine(f"Removing directory {self._path.path}")
self._path.remove()
else:
self.sendLine(
f"No directory to remove at {self._path.path}"
)
self.sendLine("")
def _number_inotify_fds(self):
cmd = ["/usr/bin/ls", "-l", f"/proc/{os.getpid()}/fd"]
completed = subprocess.run(cmd, check=True, capture_output=True)
stdout = completed.stdout.decode()
n_inotify = 0
for line in stdout.split("\n"):
if "inotify" in line:
n_inotify += 1
self.sendLine(
f"{n_inotify} inotify file descriptors tracked from "
f"\"{' '.join(cmd)}\""
)
self.sendLine("")
def lineReceived(self, line):
line = line.decode().lower()
if not line:
self.sendLine(
f'Commands: {["register", "unregister", "status", "move_dir", "rmdir"]}\n'
)
return
if line == "register":
self._register_notifier()
elif line == "unregister":
self._unregister_notifier()
elif line == "status":
self._number_inotify_fds()
elif line == "move_dir":
self._move_dir()
elif line == "rmdir":
self._remove_dir()
else:
self.lineReceived(b"")
def connectionLost(self, reason):
print("stopping reactor")
if reactor.running:
reactor.stop()
def main():
stdio.StandardIO(stdinProtocol())
reactor.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment