Created
March 18, 2020 15:14
-
-
Save gndu91/77397129b2cfc52ab30978983ecb2ac0 to your computer and use it in GitHub Desktop.
Watch for mounted/unmounted partitions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import enum | |
import hashlib | |
import os | |
import threading | |
import time | |
from ast import literal_eval | |
from functools import partial | |
import psutil | |
class Event(enum.Enum): | |
MOUNTED = 0 | |
UNMOUNTED = 1 | |
REMOUNTED = 2 | |
# This can be used like this: | |
# @watch_mount | |
# def f(event, *args); | |
# or like this: | |
# @watch_mount(watch=Event.MOUNTED) | |
# def f(*args): pass | |
# The behaviour of cases like | |
# watch_mount(watch=Event.MOUNTED)(watch=None)() | |
# should in any case be relied | |
# upon | |
# Note: this might be redesigned to have only | |
# one thread for a lot of callbacks. | |
# This is not the priority. | |
def watch_mount(callback=None, *, watch=None, only_partitions=True): | |
""" | |
:param callback: The function called | |
in a daemon thread each time an | |
event in the watchlist happens. | |
:param watch: A list of events to watch. | |
Leave this to None to watch all the events. | |
:param only_partitions: Only takes into account | |
the partitions having a defined uuid. | |
:return: | |
""" | |
if os.name != 'posix': | |
raise NotImplementedError(os.name) | |
if not os.path.exists('/proc/mounts'): | |
raise NotImplementedError('/proc/mounts was not found') | |
if only_partitions and not os.path.exists('/dev/disk/by-partuuid'): | |
raise NotImplementedError('/dev/disk/by-partuuid was not found') | |
# Allow passing parameters | |
# before passing the | |
# callback. | |
if callback is None: | |
return partial( | |
watch_mount, | |
watch=watch, | |
only_partitions=only_partitions) | |
if watch is not None: | |
if isinstance(watch, Event): | |
watch = (watch,) | |
elif isinstance(watch, int): | |
watch = (Event(watch),) | |
else: | |
watch = tuple(map(Event, watch)) | |
if not callable(callback): | |
raise RuntimeError( | |
'{!r} is not a valid callable'.format(callback)) | |
def get_sum(): | |
content = open('/proc/mounts', 'rb').read() | |
content_sum = hashlib.sha256(content).digest() | |
return content.decode('utf-8'), content_sum | |
current = dict() | |
def _thread(): | |
# The checksum is the current | |
# way to see if something | |
# has changed. | |
# I tried to see if the mtime | |
# was working, however it | |
# could not be relied upon | |
# because it has both false | |
# positives and false negatives. | |
# I also tried to use Watchdog, | |
# but it was never triggered. | |
current_sum = None | |
while True: | |
content, new_sum = get_sum() | |
if new_sum != current_sum: | |
devices = psutil.disk_partitions() | |
devices = list(i for i in devices if os.path.exists(i.mountpoint)) | |
if only_partitions: | |
items = list(os.scandir('/dev/disk/by-partuuid')) | |
f = (lambda x: os.path.abspath(os.path.join( | |
os.path.dirname(x), os.readlink(x)))) | |
items = list(f(i.path) for i in items) | |
devices = list(i for i in devices if i.device in items) | |
new = [i for i in devices if i.device not in current] | |
old = [current[i] for i in current if all( | |
device.device != i for device in devices)] | |
edited = [ | |
i for i in devices if | |
i.device in current | |
and i.mountpoint != current[i.device].mountpoint] | |
for disk in new: | |
if watch is None or Event.MOUNTED in watch: | |
callback(Event.MOUNTED, disk) | |
current[disk.device] = disk | |
for disk in old: | |
if watch is None or Event.UNMOUNTED in watch: | |
callback(Event.UNMOUNTED, disk) | |
current.pop(disk.device) | |
for disk in edited: | |
if watch is None or Event.REMOUNTED in watch: | |
callback(Event.REMOUNTED, disk) | |
current[disk.device] = disk | |
current_sum = new_sum | |
time.sleep(1) | |
threading.Thread( | |
target=_thread, | |
daemon=True | |
).start() | |
return lambda: current.copy() | |
f = watch_mount(print) | |
@watch_mount | |
def log(event, disk): | |
print('Event got', event, disk) | |
time.sleep(3) | |
print('All the devices connected (print):', f()) | |
print('All the devices connected (log):', log()) | |
input('Type enter to close') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
psutil |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment