Skip to content

Instantly share code, notes, and snippets.

@juntalis
Last active April 3, 2018 05:39
Show Gist options
  • Save juntalis/87c8cd049092dd5e169d134ebd8112d4 to your computer and use it in GitHub Desktop.
Save juntalis/87c8cd049092dd5e169d134ebd8112d4 to your computer and use it in GitHub Desktop.
Python 3 only - cba to fuck with compatibility shims.
# encoding: utf-8
import os
import io
import errno
import array
import weakref
import tempfile
import functools
# Simplified vs the stdlib code
TMP_MAX = getattr(os, 'TMP_MAX', 10000)
_text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL \
| getattr(os, 'O_NOFOLLOW', 0)
_bin_openflags = _text_openflags | getattr(os, 'O_BINARY', 0)
template = 'tmp'
_mkstemp_inner = tempfile._mkstemp_inner
_sanitize_params = tempfile._sanitize_params
_TemporaryFileWrapper = tempfile._TemporaryFileWrapper
###############
# Initial POC #
###############
# Typical usage would be:
#
# >>> with NamedTemporaryFile() as fp:
# ...
# ... # user code builds the contents of fp.
# ... ...
# ... # call is made to a reader function. reader function
# ... some_reader(fp)
# ...
# >>> def some_reader(fp):
# ... with reopen('rb') as fpin:
# ... # stuff
#
# EDIT: May have broken it when I started down the path of
# the second approach. Originally there was no tracking
# of duplicated file descriptors - it just expected that
# all calls to ``reopen``would close the resulting handles
# before the ``TemporaryFileWrapper`` instance was closed.
class TemporaryFileWrapper(_TemporaryFileWrapper):
def __init__(self, fd, name, mode='w+b', buffering=-1, encoding=None,
newline=None, delete=True):
# Store init_fd & initial ``io.open`` kwargs.
self.init_fd = fd
self.open_fds = set()
self.open_kwargs = dict(mode=mode, buffering=buffering,
newline=newline, encoding=encoding)
# Create a file object from the init_fd.
file = io.open(fd, **self.open_kwargs)
super(TemporaryFileWrapper, self).__init__(file, name, delete=delete)
def on_release(self, fd):
"""
Called when an associate file object is deleted/GCed.
:param fd: File descriptor of the file object that was gced
:type fd: int
"""
self.remove_fd(fd, False)
def add_fd(self, fd, file):
"""
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
"""
if fd in self.open_fds: return
self.open_fds.add(fd)
weakref.finalize(file, self.on_release, fd)
def remove_fd(self, fd, close=True):
"""
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
:param close: Whether or not to close the fd
:type close: bool
"""
if fd not in self.open_fds: return
self.open_fds.remove(fd)
if close: os.close(fd)
def reopen(self, **kwargs):
"""
Opens a new file object to the temporary file. Any unspecified parameter
reuses the original values from the constructor. See `io.open` for detailed
info on the parameters
:param mode: File mode to open in
:type mode: str | None
:param buffering: Buffering policy
:type buffering: int | None
:param encoding: Name of encoding to use
:type encoding: str | None
:param newline: Newline character
:type newline: str | None
:return: Newly created file object
:rtype: file
"""
dup_fd = os.dup(self.init_fd)
kwargs.update(closefd=True)
open_kwargs = self.open_kwargs.copy()
open_kwargs.update(kwargs)
# noinspection PyTypeChecker
file = io.open(dup_fd, **open_kwargs)
self.add_fd(dup_fd, file)
return file
@property
def ref_count(self):
return len(self.open_fds)
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None, delete=True):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to io.open (default "w+b").
'buffering' -- the buffer size argument to io.open (default -1).
'encoding' -- the encoding argument to io.open (default None)
'newline' -- the newline argument to io.open (default None)
'delete' -- whether the file is deleted on close (default True).
The file is created as mkstemp() would do it.
Returns an object with a file-like interface; the name of the file
is accessible as its 'name' attribute. The file will be automatically
deleted when it is closed unless the 'delete' argument is set to False.
"""
flags =_bin_openflags
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
# Setting O_TEMPORARY in the flags causes the OS to delete
# the file when it is closed. This is only supported by Windows.
if os.name == 'nt' and delete:
flags |= os.O_TEMPORARY
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
try:
return TemporaryFileWrapper(fd, name, mode=mode, buffering=buffering, encoding=encoding,
newline=newline, delete=delete)
except BaseException:
os.unlink(name)
os.close(fd)
raise
####################
# Expanded Attempt #
####################
# When the initial POC worked, I started playing around with the idea of
# expanding the attempt to track the the duplicated file descriptors and
# not allow the original fd to be closed until all dups were either closed
# or unused. Lost interest about halfway in when I realized how much wor
# would need to be done to handle some of the potential issues with this approach.
class HandleTracker:
"""
Tracks open file descriptors. Maintains a pool of unused file descriptors
from past calls to `open` because that's totally not a premature optimizaiton that
would never see a warranting use case.
:type unused_fds: array.array
:type used_fds: dict[int,weakref.ref]
"""
__slots__ = ('max_unused', 'used_fds', 'unused_fds',)
def __init__(self, max_unused=5):
"""
Constructor - initializes empty containers
:param max_unused: Configure rhe maximum number of unused fds we can pool
:type max_unused: int
"""
self.used_fds = dict()
self.max_unused = max_unused
self.unused_fds = array.array('i')
def _ensure(self, fd, file=None, ensure_used=False):
"""
Ensure that a fd is used/unused
:param fd: File descriptor
:type fd: int
:param file: File object. Required to ensure unused
:type file: file | None
:param ensure_used: ``True`` to ensure used. ``False`` otherwise.
:type ensure_used: bool
:return: Whether or not it meeds the ensured criteria
:rtype: bool
"""
if ensure_used: return fd in self.used_fds
# Remaining logic ensures unused
if fd in self.used_fds:
# fd exists in our used_fds dict. Check if it matches the
# specified file. If it does, we're still okay.
fileref = self.used_fds[fd]
return file is not None and fileref() is file
# Not in our used dict - we're good.
return True
def _remove_unused(self, fd, errors=False):
"""
Attempt to remove a file descriptor from the :attr:`unused_fds` pool.
:param fd: File descriptor
:type fd: int
:param errors: Whether or not to throw errors for unknown fds.
:type errors: bool
:return:
:rtype:
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown.
"""
try:
self.unused_fds.remove(fd)
except KeyError:
if errors: raise
def _on_delete(self, fd):
"""
Called when an associate file object is deleted/GCed.
:param fd: File descriptor of the file object that was gced
:type fd: int
"""
self.release_fd(fd, errors=False)
def use_fd(self, fd, file):
"""
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
:raises RuntimeError: when the specified :param:`fd` is already being used.
"""
# If this fd is already attached to another file object, we have a problem.
if not self._ensure(fd, file=file):
raise RuntimeError(
'fd attached to another file object without closing previous!'
)
# Add the pair to our used_fds dict.
self.used_fds[fd] = weakref.ref(file)
# Attach a finalizer callback to the file - just in case.
weakref.finalize(file, self._on_delete, fd)
# Attempt to remove the fd from our unused_fds pool.
self._remove_unused(fd)
def release_fd(self, fd, errors=True, close=False):
"""
Release a used fd.
:param fd: Filde descriptor
:type fd: int
:param errors: Should an error be thrown if fd is not known?
:type errors: bool
:param close: Close the fd?
:type close: bool
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown.
"""
# Ensure that this is a tracked in-use fd
if not self._ensure(fd, ensure_used=True):
if not errors: return
raise KeyError('Unknown fd specified: {}'.format(fd))
# Remove fd from our dict
self.used_fds.pop(fd)
# Branch logic depending on whether we need to close the fd.
if close: return os.close(fd)
self.unused_fds.add(fd)
def open(self, **kwargs):
"""
Opens a new file object to the temporary file. Any unspecified parameter
reuses the original values from the constructor. See `io.open` for detailed
info on the parameters
:param mode: File mode to open in
:type mode: str | None
:param buffering: Buffering policy
:type buffering: int | None
:param encoding: Name of encoding to use
:type encoding: str | None
:param newline: Newline character
:type newline: str | None
:return: Newly created file object
:rtype: file
"""
dup_fd = os.dup(self.init_fd) # init_fd not currently a thing - broken
kwargs.update(closefd=True)
open_kwargs = self.open_kwargs.copy()
open_kwargs.update(kwargs)
# noinspection PyTypeChecker
file = io.open(dup_fd, **open_kwargs)
self.use_fd(dup_fd, file)
return file
@property
def open_count(self):
return len(self.used_fds)
# TODO: Add a close that doesn't process until open_count == 0, at which
# point is closes all unused fds, and init_fd.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment