Created
December 28, 2022 18:19
-
-
Save nijave/c68382752143597472512aab101f6fa5 to your computer and use it in GitHub Desktop.
Directory stats watcher. Collect file count and file size continually using inotify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Use inotify to monitor the file count and size of a given directory. | |
There may be small race conditions that lead to count/size errors | |
so output should be considered a close estimate. | |
* Requires only Python 3 stdlib & Linux | |
""" | |
import argparse | |
import ctypes | |
import ctypes.util | |
import logging | |
import os | |
import pathlib | |
import shutil | |
import struct | |
import sys | |
import time | |
import typing | |
IN_CREATE = 0x00000100 | |
IN_DELETE = 0x00000200 | |
IN_Q_OVERFLOW = 0x00004000 | |
_HEADER_STRUCT_FORMAT = "iIII" | |
log = logging.getLogger(sys.argv[0]) | |
logging.basicConfig(stream=sys.stderr) | |
def _check_non_negative(result: int): | |
if result == -1: | |
raise RuntimeError("inotify call failed: " + ctypes.get_errno()) | |
return result | |
def initial_scan() -> typing.Dict[str, int]: | |
file_map = {} | |
for filename in pathlib.Path(".").glob("*"): | |
try: | |
file_map[filename.name] = filename.stat().st_size | |
except FileNotFoundError: | |
log.warning( | |
"type=initial-scan-warning detail=file-deleted-during-scan filename=%s", | |
filename.name, | |
) | |
return file_map | |
def process_events(event_queue, out_map) -> None: | |
event_header_bytes = event_queue.read(struct.calcsize(_HEADER_STRUCT_FORMAT)) | |
_, mask, _, length = struct.unpack(_HEADER_STRUCT_FORMAT, event_header_bytes) | |
data_bytes = event_queue.read(length).rstrip(b"\0").decode("utf-8") | |
log.debug( | |
"type=inotify-event mask=%s length=%s filename=%s", | |
hex(mask), | |
length, | |
data_bytes, | |
) | |
if mask == IN_DELETE: | |
try: | |
del out_map[data_bytes] | |
except KeyError: | |
log.warning( | |
"type=inotify-warning detail=file-not-found-in-map filename=%s", | |
data_bytes, | |
) | |
elif mask == IN_CREATE: | |
path = pathlib.Path(data_bytes) | |
out_map[path.name] = path.stat().st_size | |
elif mask == IN_Q_OVERFLOW: | |
log.error( | |
'type=inotify-error detail=inotify-event-queue-overflow help="consider increasing fs.inotify.max_queued_events"' | |
) | |
def main(stats_file: str, update_interval: float): | |
directory = pathlib.Path(".").resolve() | |
inotify = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c")) | |
inotify.inotify_init.argtypes = [] | |
inotify.inotify_init.restype = _check_non_negative | |
inotify.inotify_add_watch.argtypes = [ | |
ctypes.c_int, | |
ctypes.c_char_p, | |
ctypes.c_uint32, | |
] | |
inotify.inotify_add_watch.restype = _check_non_negative | |
log.info("type=inotify-init detail=initializing") | |
fd = inotify.inotify_init() | |
log.info("type=inotify-init fd=%s", str(fd)) | |
watch_id = inotify.inotify_add_watch( | |
fd, | |
str(directory).encode("utf-8"), | |
IN_CREATE | IN_DELETE, | |
) | |
log.info("type=inotify-init watch=%d", watch_id) | |
file_map = initial_scan() | |
last_update = -1 | |
with open(fd, "rb") as inotify_events: | |
while True: | |
process_events( | |
inotify_events, | |
file_map, | |
) | |
if time.time() - last_update >= update_interval: | |
with open(f"{stats_file}.new", "w") as out_file: | |
file_count = str(len(file_map)) | |
file_bytes = str(sum(file_map.values())) | |
out_file.write(file_count) | |
out_file.write("\t") | |
out_file.write(file_bytes) | |
log.debug( | |
"type=file-details files=%s bytes=%s", file_count, file_bytes | |
) | |
shutil.move(f"{stats_file}.new", stats_file) | |
last_update = time.time() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-l", | |
"--log-level", | |
default="info", | |
help="Log level (error, warning, info, debug)", | |
) | |
parser.add_argument( | |
"-d", "--directory", required=True, help="Directory to watch stats for" | |
) | |
parser.add_argument( | |
"-i", | |
"--report-interval", | |
type=float, | |
default=5, | |
help="How often to update stats file (in seconds)", | |
) | |
parser.add_argument( | |
"-o", | |
"--output-file", | |
default="/tmp/directory-stats-watcher.txt", | |
help="File to write stats to", | |
) | |
args = parser.parse_args() | |
log.setLevel(getattr(logging, args.log_level.upper())) | |
watch_path = pathlib.Path(args.directory).resolve() | |
log.info("type=startup directory=%s", watch_path) | |
os.chdir(watch_path) | |
main(stats_file=args.output_file, update_interval=args.report_interval) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment