Skip to content

Instantly share code, notes, and snippets.

@ranchodeluxe
Last active April 9, 2024 12:58
Show Gist options
  • Save ranchodeluxe/42b9cbf9a241b528c9f2104490edbc31 to your computer and use it in GitHub Desktop.
Save ranchodeluxe/42b9cbf9a241b528c9f2104490edbc31 to your computer and use it in GitHub Desktop.
import os
import pytest
import s3fs
import fsspec
import pickle
from multiprocessing import Process, Queue
fs = fsspec.filesystem('local')
@pytest.fixture
def byte_range_filepath(tmp_path):
output_filepath = os.path.join(str(tmp_path), 'test.txt')
with open(output_filepath, mode="w") as f:
for indx, i in enumerate(range(3), start=1):
# write exactly 10 bytes
f.write(f"{indx:02d}________")
return output_filepath
def process_task(serialized_fsock: fsspec.spec.AbstractBufferedFile, byte_range, queue):
open_fsock = pickle.loads(serialized_fsock)
open_fsock.seek(byte_range[0])
data = open_fsock.read(byte_range[1] - byte_range[0])
queue.put(data)
def test_open_file_handlers(byte_range_filepath):
byte_ranges = [(0, 10), (10, 20), (20, 30)]
open_fsock = fs.open(byte_range_filepath, mode='rb')
open_fsock.seek(30)
serialized_fsock = pickle.dumps(open_fsock)
queue = Queue()
jobs = []
for byte_range in byte_ranges:
job = Process(target=process_task, args=(serialized_fsock, byte_range, queue))
job.start()
jobs.append(job)
for job in jobs:
job.join()
while not queue.empty():
assert queue.get() in [b'01________', b'02________', b'03________']
@ranchodeluxe
Copy link
Author

ranchodeluxe commented Apr 9, 2024

threaded version

import os
import pytest
import s3fs
import fsspec
import pickle
from multiprocessing import Process, Queue
from threading import Thread

fs = fsspec.filesystem('local')


@pytest.fixture
def byte_range_filepath(tmp_path):
    output_filepath = os.path.join(str(tmp_path), 'test.txt')
    with open(output_filepath, mode="w") as f:
        for indx, i in enumerate(range(3), start=1): 
            # write exactly 10 bytes
            f.write(f"{indx:02d}________")
    return output_filepath


def process_task(serialized_fsock: fsspec.spec.AbstractBufferedFile, byte_range, queue):
    open_fsock = pickle.loads(serialized_fsock)
    open_fsock.seek(byte_range[0])
    data = open_fsock.read(byte_range[1] - byte_range[0])
    queue.put(data)


def test_open_file_handlers(byte_range_filepath):
    byte_ranges = [(0, 10),  (10, 20), (20, 30)]
    open_fsock = fs.open(byte_range_filepath, mode='rb')
    open_fsock.seek(30)
    serialized_fsock = pickle.dumps(open_fsock)
    queue = Queue()

    jobs = []
    for byte_range in byte_ranges:
        job = Thread(target=process_task, args=(serialized_fsock, byte_range, queue))
        job.start()
        jobs.append(job)

    for job in jobs:
        job.join()

    while not queue.empty():
        assert queue.get() in [b'01________', b'02________', b'03________']

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment