To test the issue, run:
$ python3 test_1.py /abs/path/to/file/1.txt
$ python3 test_2.py /abs/path/to/file/2.txt
$ python3 test_3.py /abs/path/to/file/3.txt
$ python3 test_4.py /abs/path/to/file/4.txt
#!/usr/bin/env python3 | |
"""Read file immediately after close.""" | |
import os | |
import sys | |
import use_file | |
filename = sys.argv[1] | |
try: | |
os.remove(filename) | |
except OSError: | |
pass | |
use_file.generate(filename) | |
use_file.use_file(filename) |
#!/usr/bin/env python3 | |
"""Read file in another thread.""" | |
import os | |
import sys | |
from threading import Thread | |
import use_file | |
filename = sys.argv[1] | |
try: | |
os.remove(filename) | |
except OSError: | |
pass | |
use_file.generate(filename) | |
Thread(target=use_file.use_file, args=(filename,)).start() | |
#!/usr/bin/env python3 | |
"""Read file in another (sub)process.""" | |
import os | |
import sys | |
from subprocess import check_call | |
import use_file | |
filename = sys.argv[1] | |
try: | |
os.remove(filename) | |
except OSError: | |
pass | |
use_file.generate(filename) | |
check_call([sys.executable, 'use_file.py', filename]) | |
#!/usr/bin/env python3 | |
"""Read file both in another thread and another subprocess.""" | |
import os | |
import sys | |
from subprocess import check_call | |
from threading import Thread | |
import use_file | |
filename = sys.argv[1] | |
try: | |
os.remove(filename) | |
except OSError: | |
pass | |
use_file.generate(filename) | |
t = Thread(target=check_call, args=([sys.executable, 'use_file.py', filename],)) | |
t.start() |
#!/usr/bin/env python3 | |
import itertools | |
import os | |
from functools import partial | |
def use_file(filename): | |
assert os.path.getsize(filename) == getsize(filename) != 0 | |
def getsize(filename, chunksize=1<<15): | |
with open(filename, 'rb') as file: | |
return sum(map(len, iter(partial(file.read, chunksize), b''))) | |
def generate(filename, pattern=b"abcd"*(1<<18), repeat=10000): | |
with open(filename, 'wb') as file: | |
for chunk in itertools.repeat(pattern, repeat): | |
file.write(chunk) | |
if __name__ == "__main__": | |
import sys | |
use_file(sys.argv[1]) |