-
-
Save ChaoticMind/3a0683038c156904698f0455271149cc to your computer and use it in GitHub Desktop.
Helper to debug https://github.com/twisted/twisted/commit/073da70d41d7c6a5980f8c6807b5f0afaef04c8e#commitcomment-133586418
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import subprocess | |
from twisted.internet import reactor, inotify, stdio | |
from twisted.protocols import basic | |
from twisted.python.filepath import FilePath | |
class overriddenInotify(inotify.INotify): | |
def _rmWatch(self, wd): | |
print("called _rmWatch...") | |
super()._rmWatch(wd) | |
def connectionLost(self, reason): | |
print("closing inotify file descriptor") | |
super().connectionLost(reason) | |
class stdinProtocol(basic.LineReceiver): | |
delimiter = b"\n" | |
directory0 = "/tmp/deleteme0" | |
directory1 = "/tmp/deleteme1" | |
move_dst = "/tmp/deleteme2" | |
_path0 = None | |
_path1 = None | |
_notifier = None | |
def __init__(self): | |
self._path0 = FilePath(self.directory0) | |
self._path1 = FilePath(self.directory1) | |
# self._init_notifier() | |
def create_or_recreate_if_exists(self): | |
if self._path0.exists(): | |
print(f"Removing directory: {self.directory0}") | |
self._path0.remove() | |
if self._path1.exists(): | |
print(f"Removing directory: {self.directory1}") | |
self._path1.remove() | |
print(f"Creating directory: {self.directory0}") | |
print(f"Creating directory: {self.directory1}") | |
self._path0.makedirs() | |
self._path1.makedirs() | |
def connectionMade(self): | |
self.sendLine(f"[pid: {os.getpid()}] Connected to stdin...") | |
self.lineReceived(b"") | |
def sendLine(self, line): | |
super().sendLine(str(line).encode()) | |
def _init_notifier(self): | |
if self._notifier: | |
# self._notifier.loseConnection() | |
pass | |
else: | |
self._notifier = overriddenInotify() | |
self._notifier.startReading() | |
def _register_notifier(self): | |
self.create_or_recreate_if_exists() | |
self._init_notifier() | |
mask = ( | |
inotify.IN_WATCH_MASK | |
) # inotify.IN_CHANGED | inotify.IN_CREATE | |
mask ^= inotify.IN_DELETE_SELF # exclude IN_DELETE_SELF events | |
self.sendLine(f"Registering notifier on {self._path0.path}") | |
self.sendLine(f"Registering notifier on {self._path1.path}") | |
try: | |
self._notifier.watch( | |
self._path0, | |
mask=mask, | |
callbacks=[self._on_dir_modified], | |
) | |
self._notifier.watch( | |
self._path1, | |
mask=mask, | |
callbacks=[self._on_dir_modified], | |
) | |
except inotify.INotifyError as e: | |
self.sendLine( | |
"Couldn't register inotify watcher, library raised an " | |
f"exception: {e})" | |
) | |
self._notifier.loseConnection() | |
self.transport.loseConnection() | |
self.sendLine("") | |
def _on_dir_modified(self, ignored, filepath, mask): | |
self.sendLine( | |
f"tracked_dir_modified: {ignored} - {filepath.path} - " | |
f"{inotify.humanReadableMask(mask)}" | |
) | |
def monkey_patch(): | |
if mask == inotify.IN_DELETE_SELF: | |
# twisted will, in this case, forcibly call _rmWatch() | |
# (equivalent to .ignore()), but doesn't call .loseConnection() | |
self._notifier.loseConnection() | |
self._notifier = None | |
# self.sendLine(ignored) | |
# self.sendLine(filepath.path) | |
# self.sendLine(inotify.humanReadableMask(mask)) | |
# monkey_patch() | |
self.sendLine("") | |
def _unregister_notifier(self): | |
if self._notifier: | |
self.sendLine(f"Unregistering {self._path0}") | |
self.sendLine(f"Unregistering {self._path1}") | |
self._notifier.ignore(self._path0) | |
self._notifier.ignore(self._path1) | |
self._notifier.loseConnection() | |
self._notifier = None | |
else: | |
self.sendLine("Nothing registered, doing nothing...") | |
self.sendLine("") | |
def _move_dir(self): | |
if self._path.exists(): | |
self._path.moveTo(FilePath(self.move_dst)) | |
else: | |
self.sendLine(f'No dir "{self._path.path}" to move') | |
self.sendLine("") | |
def _remove_dir(self): | |
if self._path.exists(): | |
self.sendLine(f"Removing directory {self._path.path}") | |
self._path.remove() | |
else: | |
self.sendLine( | |
f"No directory to remove at {self._path.path}" | |
) | |
self.sendLine("") | |
def _number_inotify_fds(self): | |
cmd = ["/usr/bin/ls", "-l", f"/proc/{os.getpid()}/fd"] | |
completed = subprocess.run(cmd, check=True, capture_output=True) | |
stdout = completed.stdout.decode() | |
n_inotify = 0 | |
for line in stdout.split("\n"): | |
if "inotify" in line: | |
n_inotify += 1 | |
self.sendLine( | |
f"{n_inotify} inotify file descriptors tracked from " | |
f"\"{' '.join(cmd)}\"" | |
) | |
self.sendLine("") | |
def lineReceived(self, line): | |
line = line.decode().lower() | |
if not line: | |
self.sendLine( | |
f'Commands: {["register", "unregister", "status", "move_dir", "rmdir"]}\n' | |
) | |
return | |
if line == "register": | |
self._register_notifier() | |
elif line == "unregister": | |
self._unregister_notifier() | |
elif line == "status": | |
self._number_inotify_fds() | |
elif line == "move_dir": | |
self._move_dir() | |
elif line == "rmdir": | |
self._remove_dir() | |
else: | |
self.lineReceived(b"") | |
def connectionLost(self, reason): | |
print("stopping reactor") | |
if reactor.running: | |
reactor.stop() | |
def main(): | |
stdio.StandardIO(stdinProtocol()) | |
reactor.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment