Skip to content

Instantly share code, notes, and snippets.

@Eboubaker
Last active December 13, 2024 08:36
Show Gist options
  • Save Eboubaker/6a0b807788088a764b2a4c156fda0e4b to your computer and use it in GitHub Desktop.
Save Eboubaker/6a0b807788088a764b2a4c156fda0e4b to your computer and use it in GitHub Desktop.
updated
import contextlib
import threading
from typing import Generator
# Author : github.com/Eboubaker
# Fixed by: github.com/icezyclon
class ReentrantRWLock:
"""This class implements reentrant read-write lock objects.
A read-write lock can be aquired in read mode or in write mode or both.
Many different readers are allowed while no thread holds the write lock.
While a writer holds the write lock, no other threads, aside from the writer,
may hold the read or the write lock.
A thread may upgrade the lock to write mode while already holding the read lock.
Similarly, a thread already having write access may aquire the read lock
(or may already have it), to retain read access when releasing the write lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock (read or write), the same thread may acquire it
again without blocking any number of times;
the thread must release each lock (read/write) the same number of times it has acquired it!
The lock provides contextmanagers in the form of `for_read()` and `for_write()`,
which automatically aquire and release the corresponding lock, e.g.,
>>> with lock.for_read(): # get read access until end of context
>>> ...
>>> with lock.for_write(): # upgrade to write access until end of inner
>>> ...
"""
def __init__(self) -> None:
self._writer: int | None = None # current writer
self._writer_count: int = 0 # number of times writer holding write lock
# set of current readers mapping to number of times holding read lock
# entry is missing if not holding the read lock (no 0 values)
self._readers: dict[int, int] = dict()
# main lock + condition, is used for:
# * protecting read/write access to _writer, _writer_times and _readers
# * is actively held when having write access (so no other thread has access)
# * future writers can wait() on the lock to be notified once nobody is reading/writing anymore
self._lock = threading.Condition(threading.RLock()) # reentrant
@contextlib.contextmanager
def for_read(self) -> Generator["ReentrantRWLock", None, None]:
"""
used for 'with' block, e.g., with lock.for_read(): ...
"""
try:
self.acquire_read()
yield self
finally:
self.release_read()
@contextlib.contextmanager
def for_write(self) -> Generator["ReentrantRWLock", None, None]:
"""
used for 'with' block, e.g., with lock.for_write(): ...
"""
try:
self.acquire_write()
yield self
finally:
self.release_write()
def acquire_read(self) -> None:
"""
Acquire one read lock. Blocks only if a another thread has acquired the write lock.
"""
ident: int = threading.current_thread().ident # type: ignore
with self._lock:
self._readers[ident] = self._readers.get(ident, 0) + 1
def release_read(self) -> None:
"""
Release one currently held read lock from this thread.
"""
ident: int = threading.current_thread().ident # type: ignore
with self._lock:
if ident not in self._readers:
raise RuntimeError(
f"Read lock was released while not holding it by thread {ident}"
)
if self._readers[ident] == 1:
del self._readers[ident]
else:
self._readers[ident] -= 1
if not self._readers: # if no other readers remain
self._lock.notify() # wake the next writer if any
def acquire_write(self) -> None:
"""
Acquire one write lock. Blocks until there are no acquired read or write locks from other threads.
"""
ident: int = threading.current_thread().ident # type: ignore
self._lock.acquire() # is reentrant, so current writer can aquire again
if self._writer == ident:
self._writer_count += 1
return
# do not be reader while waiting for write or notify will not be called
times_reading = self._readers.pop(ident, 0)
while len(self._readers) > 0:
self._lock.wait()
self._writer = ident
self._writer_count += 1
if times_reading:
# restore number of read locks thread originally had
self._readers[ident] = times_reading
def release_write(self) -> None:
"""
Release one currently held write lock from this thread.
"""
if self._writer != threading.current_thread().ident:
raise RuntimeError(
f"Write lock was released while not holding it by thread {threading.current_thread().ident}"
)
self._writer_count -= 1
if self._writer_count == 0:
self._writer = None
self._lock.notify() # wake the next writer if any
self._lock.release()
@Eboubaker
Copy link
Author

Eboubaker commented Oct 27, 2022

Usage

lock = ReentrantRWLock()
lock.acquire_read()
# read stuff. no writer has the lock, other readers may also have the lock
lock.release_read()

lock.acquire_write()
# write stuff. no reader or writer has the lock
lock.release_write()

Alternative using with blocks

with lock.for_read():
   #read stuff

with lock.for_write():
   #write stuff

@telecran-telecrit
Copy link

Your ident = threading.current_thread().ident, self._writer == ident check produces dead-lock within HttpGetHandler::do_GET in case of readers/writers from different threads, even for simple class ThreadingHTTPServer(ThreadingMixIn, HTTPServer)

@Eboubaker
Copy link
Author

Eboubaker commented May 2, 2024

@telecran-telecrit I don't see the problem. give the code snippet that makes a deadlock,

@icezyclon
Copy link

icezyclon commented Jul 30, 2024

Hi there,
in search of a ReentrantRWLock I found your code.
I was just writing tests for the class when I found this, which seemingly deadlocks:

from utils import ReentrantRWLock  # your lock impl.
import time
from threading import Thread
import pytest  # pip install pytest pytest-timeout

TIMEOUT = 5  # should be at least 3 * SLEEP_TIME

# Some tests use sleeps to simulate possible race conditions
# Define how long this should be - test precision is dependent on this!
SLEEP_TIME = 0.5

@pytest.mark.timeout(TIMEOUT)
def test_multi_threaded_read_write_exclusive():
    lock = ReentrantRWLock()

    def read():
        print("Before read aquire")
        with lock.for_read():
            print("Before read sleep")
            time.sleep(SLEEP_TIME)
            print("read done")
            return

    def write():
        print("Before write aquire")
        with lock.for_write():
            print("Before write sleep")
            time.sleep(SLEEP_TIME)
            print("write done")
            return

    t1 = Thread(name="write", target=write, daemon=True)
    t2 = Thread(name="read", target=read, daemon=True)
    # this different order also deadlocks
    # t1 = Thread(name="read", target=read, daemon=True)
    # t2 = Thread(name="write", target=write, daemon=True)
    start = time.perf_counter()
    t1.start()
    time.sleep(0.01)
    t2.start()
    t1.join()
    t2.join()
    end = time.perf_counter()
    # definitly at least 2 * SLEEP_TIME!
    assert (
        end - start > 1.9 * SLEEP_TIME
    ), f"Time for both joins was {end - start}, should be > {1.9 * SLEEP_TIME}"

The idea of the test case was to measure the time it takes for both threads to join back into the main thread.
If both sleeps happen at the same time (which would be incorrect here), then end - start should be only slightly higher than SLEEP_TIME.
However, I run into the 5 seconds timeout here and after taking a look, I believe that the read and write lock switched roles?
When the write lock (which goes first) returns from the __exit__ function, it pops the last element of the _with_ops_write list, which is a 0, leaving a 1 from itself.
In fact, I am not sure how you can be sure to always access the correct element in _with_ops_write as the order the locks could be released in might change.
Alternatively, maybe I missed something? Could you maybe take a look at this?
Note, this test case works as intended when calling aquire_read/write and release_read/write directly instead of using context managers.
Thanks a lot

@icezyclon
Copy link

icezyclon commented Jul 30, 2024

I forked your repo and tried my hand at a re-implementation using context managers to fix the problem of "not knowing if we are writing or not".
This at least seems to fix my test case above.
Also, it has the advantage of not allowing anyone to enter the context without explicitly calling either the for_read or for_write methods.

Update:
I also found with the original code that the reentrant part of the lock is brocken.
If you aquire the same lock multiple times and then release once, then thread will no longer hold the lock even though it should still hold it (multiple times). I have a counting/fixed version here .

@Eboubaker
Copy link
Author

@icezyclon sorry for the late reply,
you're right I personally haven't done extensive testing like you did, I made this class in a hurry for another project the issue didn't happen on my project so I didn't notice it.
thanks for your work and extensive testing. I have updated my gist with yours

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment