Skip to content

Instantly share code, notes, and snippets.

@danizen
Last active December 31, 2019 22:09
Show Gist options
  • Save danizen/744d4dca54266e8c951e0ba9dc9536c5 to your computer and use it in GitHub Desktop.
Save danizen/744d4dca54266e8c951e0ba9dc9536c5 to your computer and use it in GitHub Desktop.
An excerpt that shows how to do multiprocessing
import os
import multiprocessing
import subprocess
import sys
import traceback
def make_unreadable_file(path):
os.chmod(path, 0o000)
if sys.platform == "win32":
# Once we drop PY2 we can use `os.getlogin()` instead.
username = os.environ["USERNAME"]
# Remove "Read Data/List Directory" permission for current user, but
# leave everything else.
args = ["icacls", path, "/deny", username + ":(RD)"]
result = subprocess.check_call(args)
print('icacls result', result)
def external_file_opener(conn):
"""
This external process is run with multiprocessing.
It waits for a path from the parent, opens it, and then wait for another message before closing it.
:param conn: bi-directional pipe
:return: nothing
"""
f = None
try:
# Wait for parent to send path
msg = conn.recv()
if msg is True:
# Do nothing - we have been told to exit without a path or action
pass
else:
path, action = msg
# Open the file
try:
f = open(path, 'r')
if action == 'lock':
lock_action(f)
elif action == 'noaccess':
make_unreadable_file(path)
except OSError as e:
traceback.print_exc(None, sys.stderr)
# Indicate the file is opened
conn.send(True)
# Now path is open and we wait for signal to exit
conn.recv()
finally:
if f:
f.close()
conn.close()
class FileOpener(object):
def __init__(self, path=None, action=None):
self.path = None
self.conn, child_conn = multiprocessing.Pipe()
self.child = multiprocessing.Process(target=external_file_opener, daemon=True, args=(child_conn,))
self.child.start()
if path:
self.send(path, action)
def send(self, path, action=None):
if self.path is not None:
raise AttributeError('path may only be set once')
self.path = str(path)
self.conn.send( (str(path), action) )
return self.conn.recv()
def cleanup(self):
# send a message to the child to exit; it will exit whether the path was sent or not
self.conn.send(True)
self.child.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment