Skip to content

Instantly share code, notes, and snippets.

Last active April 3, 2018 05:39
Show Gist options
  • Save juntalis/87c8cd049092dd5e169d134ebd8112d4 to your computer and use it in GitHub Desktop.
Save juntalis/87c8cd049092dd5e169d134ebd8112d4 to your computer and use it in GitHub Desktop.
Python 3 only - cba to fuck with compatibility shims.
# encoding: utf-8
import os
import io
import errno
import array
import weakref
import tempfile
import functools
# Simplified vs the stdlib code
TMP_MAX = getattr(os, 'TMP_MAX', 10000)
_text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL \
| getattr(os, 'O_NOFOLLOW', 0)
_bin_openflags = _text_openflags | getattr(os, 'O_BINARY', 0)
template = 'tmp'
_mkstemp_inner = tempfile._mkstemp_inner
_sanitize_params = tempfile._sanitize_params
_TemporaryFileWrapper = tempfile._TemporaryFileWrapper
# Initial POC #
# Typical usage would be:
# >>> with NamedTemporaryFile() as fp:
# ...
# ... # user code builds the contents of fp.
# ... ...
# ... # call is made to a reader function. reader function
# ... some_reader(fp)
# ...
# >>> def some_reader(fp):
# ... with reopen('rb') as fpin:
# ... # stuff
# EDIT: May have broken it when I started down the path of
# the second approach. Originally there was no tracking
# of duplicated file descriptors - it just expected that
# all calls to ``reopen``would close the resulting handles
# before the ``TemporaryFileWrapper`` instance was closed.
class TemporaryFileWrapper(_TemporaryFileWrapper):
def __init__(self, fd, name, mode='w+b', buffering=-1, encoding=None,
newline=None, delete=True):
# Store init_fd & initial ```` kwargs.
self.init_fd = fd
self.open_fds = set()
self.open_kwargs = dict(mode=mode, buffering=buffering,
newline=newline, encoding=encoding)
# Create a file object from the init_fd.
file =, **self.open_kwargs)
super(TemporaryFileWrapper, self).__init__(file, name, delete=delete)
def on_release(self, fd):
Called when an associate file object is deleted/GCed.
:param fd: File descriptor of the file object that was gced
:type fd: int
self.remove_fd(fd, False)
def add_fd(self, fd, file):
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
if fd in self.open_fds: return
weakref.finalize(file, self.on_release, fd)
def remove_fd(self, fd, close=True):
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
:param close: Whether or not to close the fd
:type close: bool
if fd not in self.open_fds: return
if close: os.close(fd)
def reopen(self, **kwargs):
Opens a new file object to the temporary file. Any unspecified parameter
reuses the original values from the constructor. See `` for detailed
info on the parameters
:param mode: File mode to open in
:type mode: str | None
:param buffering: Buffering policy
:type buffering: int | None
:param encoding: Name of encoding to use
:type encoding: str | None
:param newline: Newline character
:type newline: str | None
:return: Newly created file object
:rtype: file
dup_fd = os.dup(self.init_fd)
open_kwargs = self.open_kwargs.copy()
# noinspection PyTypeChecker
file =, **open_kwargs)
self.add_fd(dup_fd, file)
return file
def ref_count(self):
return len(self.open_fds)
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None, delete=True):
"""Create and return a temporary file.
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to (default "w+b").
'buffering' -- the buffer size argument to (default -1).
'encoding' -- the encoding argument to (default None)
'newline' -- the newline argument to (default None)
'delete' -- whether the file is deleted on close (default True).
The file is created as mkstemp() would do it.
Returns an object with a file-like interface; the name of the file
is accessible as its 'name' attribute. The file will be automatically
deleted when it is closed unless the 'delete' argument is set to False.
flags =_bin_openflags
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
# Setting O_TEMPORARY in the flags causes the OS to delete
# the file when it is closed. This is only supported by Windows.
if == 'nt' and delete:
flags |= os.O_TEMPORARY
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
return TemporaryFileWrapper(fd, name, mode=mode, buffering=buffering, encoding=encoding,
newline=newline, delete=delete)
except BaseException:
# Expanded Attempt #
# When the initial POC worked, I started playing around with the idea of
# expanding the attempt to track the the duplicated file descriptors and
# not allow the original fd to be closed until all dups were either closed
# or unused. Lost interest about halfway in when I realized how much wor
# would need to be done to handle some of the potential issues with this approach.
class HandleTracker:
Tracks open file descriptors. Maintains a pool of unused file descriptors
from past calls to `open` because that's totally not a premature optimizaiton that
would never see a warranting use case.
:type unused_fds: array.array
:type used_fds: dict[int,weakref.ref]
__slots__ = ('max_unused', 'used_fds', 'unused_fds',)
def __init__(self, max_unused=5):
Constructor - initializes empty containers
:param max_unused: Configure rhe maximum number of unused fds we can pool
:type max_unused: int
self.used_fds = dict()
self.max_unused = max_unused
self.unused_fds = array.array('i')
def _ensure(self, fd, file=None, ensure_used=False):
Ensure that a fd is used/unused
:param fd: File descriptor
:type fd: int
:param file: File object. Required to ensure unused
:type file: file | None
:param ensure_used: ``True`` to ensure used. ``False`` otherwise.
:type ensure_used: bool
:return: Whether or not it meeds the ensured criteria
:rtype: bool
if ensure_used: return fd in self.used_fds
# Remaining logic ensures unused
if fd in self.used_fds:
# fd exists in our used_fds dict. Check if it matches the
# specified file. If it does, we're still okay.
fileref = self.used_fds[fd]
return file is not None and fileref() is file
# Not in our used dict - we're good.
return True
def _remove_unused(self, fd, errors=False):
Attempt to remove a file descriptor from the :attr:`unused_fds` pool.
:param fd: File descriptor
:type fd: int
:param errors: Whether or not to throw errors for unknown fds.
:type errors: bool
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown.
except KeyError:
if errors: raise
def _on_delete(self, fd):
Called when an associate file object is deleted/GCed.
:param fd: File descriptor of the file object that was gced
:type fd: int
self.release_fd(fd, errors=False)
def use_fd(self, fd, file):
Given a newely created file object and its corresponding
file descriptor, add a new reference to track.
:param fd: Filde descriptor
:type fd: int
:param file: file object
:type file: file
:raises RuntimeError: when the specified :param:`fd` is already being used.
# If this fd is already attached to another file object, we have a problem.
if not self._ensure(fd, file=file):
raise RuntimeError(
'fd attached to another file object without closing previous!'
# Add the pair to our used_fds dict.
self.used_fds[fd] = weakref.ref(file)
# Attach a finalizer callback to the file - just in case.
weakref.finalize(file, self._on_delete, fd)
# Attempt to remove the fd from our unused_fds pool.
def release_fd(self, fd, errors=True, close=False):
Release a used fd.
:param fd: Filde descriptor
:type fd: int
:param errors: Should an error be thrown if fd is not known?
:type errors: bool
:param close: Close the fd?
:type close: bool
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown.
# Ensure that this is a tracked in-use fd
if not self._ensure(fd, ensure_used=True):
if not errors: return
raise KeyError('Unknown fd specified: {}'.format(fd))
# Remove fd from our dict
# Branch logic depending on whether we need to close the fd.
if close: return os.close(fd)
def open(self, **kwargs):
Opens a new file object to the temporary file. Any unspecified parameter
reuses the original values from the constructor. See `` for detailed
info on the parameters
:param mode: File mode to open in
:type mode: str | None
:param buffering: Buffering policy
:type buffering: int | None
:param encoding: Name of encoding to use
:type encoding: str | None
:param newline: Newline character
:type newline: str | None
:return: Newly created file object
:rtype: file
dup_fd = os.dup(self.init_fd) # init_fd not currently a thing - broken
open_kwargs = self.open_kwargs.copy()
# noinspection PyTypeChecker
file =, **open_kwargs)
self.use_fd(dup_fd, file)
return file
def open_count(self):
return len(self.used_fds)
# TODO: Add a close that doesn't process until open_count == 0, at which
# point is closes all unused fds, and init_fd.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment