Skip to content

Instantly share code, notes, and snippets.

@nijave
Created December 28, 2022 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nijave/c68382752143597472512aab101f6fa5 to your computer and use it in GitHub Desktop.
Save nijave/c68382752143597472512aab101f6fa5 to your computer and use it in GitHub Desktop.
Directory stats watcher. Collect file count and file size continually using inotify
#!/usr/bin/env python3
"""
Use inotify to monitor the file count and size of a given directory.
There may be small race conditions that lead to count/size errors
so output should be considered a close estimate.
* Requires only Python 3 stdlib & Linux
"""
import argparse
import ctypes
import ctypes.util
import logging
import os
import pathlib
import shutil
import struct
import sys
import time
import typing
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
IN_Q_OVERFLOW = 0x00004000
_HEADER_STRUCT_FORMAT = "iIII"
log = logging.getLogger(sys.argv[0])
logging.basicConfig(stream=sys.stderr)
def _check_non_negative(result: int):
if result == -1:
raise RuntimeError("inotify call failed: " + ctypes.get_errno())
return result
def initial_scan() -> typing.Dict[str, int]:
file_map = {}
for filename in pathlib.Path(".").glob("*"):
try:
file_map[filename.name] = filename.stat().st_size
except FileNotFoundError:
log.warning(
"type=initial-scan-warning detail=file-deleted-during-scan filename=%s",
filename.name,
)
return file_map
def process_events(event_queue, out_map) -> None:
event_header_bytes = event_queue.read(struct.calcsize(_HEADER_STRUCT_FORMAT))
_, mask, _, length = struct.unpack(_HEADER_STRUCT_FORMAT, event_header_bytes)
data_bytes = event_queue.read(length).rstrip(b"\0").decode("utf-8")
log.debug(
"type=inotify-event mask=%s length=%s filename=%s",
hex(mask),
length,
data_bytes,
)
if mask == IN_DELETE:
try:
del out_map[data_bytes]
except KeyError:
log.warning(
"type=inotify-warning detail=file-not-found-in-map filename=%s",
data_bytes,
)
elif mask == IN_CREATE:
path = pathlib.Path(data_bytes)
out_map[path.name] = path.stat().st_size
elif mask == IN_Q_OVERFLOW:
log.error(
'type=inotify-error detail=inotify-event-queue-overflow help="consider increasing fs.inotify.max_queued_events"'
)
def main(stats_file: str, update_interval: float):
directory = pathlib.Path(".").resolve()
inotify = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c"))
inotify.inotify_init.argtypes = []
inotify.inotify_init.restype = _check_non_negative
inotify.inotify_add_watch.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_uint32,
]
inotify.inotify_add_watch.restype = _check_non_negative
log.info("type=inotify-init detail=initializing")
fd = inotify.inotify_init()
log.info("type=inotify-init fd=%s", str(fd))
watch_id = inotify.inotify_add_watch(
fd,
str(directory).encode("utf-8"),
IN_CREATE | IN_DELETE,
)
log.info("type=inotify-init watch=%d", watch_id)
file_map = initial_scan()
last_update = -1
with open(fd, "rb") as inotify_events:
while True:
process_events(
inotify_events,
file_map,
)
if time.time() - last_update >= update_interval:
with open(f"{stats_file}.new", "w") as out_file:
file_count = str(len(file_map))
file_bytes = str(sum(file_map.values()))
out_file.write(file_count)
out_file.write("\t")
out_file.write(file_bytes)
log.debug(
"type=file-details files=%s bytes=%s", file_count, file_bytes
)
shutil.move(f"{stats_file}.new", stats_file)
last_update = time.time()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-l",
"--log-level",
default="info",
help="Log level (error, warning, info, debug)",
)
parser.add_argument(
"-d", "--directory", required=True, help="Directory to watch stats for"
)
parser.add_argument(
"-i",
"--report-interval",
type=float,
default=5,
help="How often to update stats file (in seconds)",
)
parser.add_argument(
"-o",
"--output-file",
default="/tmp/directory-stats-watcher.txt",
help="File to write stats to",
)
args = parser.parse_args()
log.setLevel(getattr(logging, args.log_level.upper()))
watch_path = pathlib.Path(args.directory).resolve()
log.info("type=startup directory=%s", watch_path)
os.chdir(watch_path)
main(stats_file=args.output_file, update_interval=args.report_interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment