Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A test for Redis-based file locking when rapidly updating a JSON document (doesn't work)
#!/usr/bin/env python
"""Script to stress test rapid JSON DB writes.
1. When reading from disk, flock(LOCK_SH) is used on the file handle.
2. When writing to disk:
a. A lock is acquired in a Redis instance as added protection
b. File is locked with LOCK_EX during the write
c. After the file is written (while still Redis-locked), it attempts to read
the file again immediately (LOCK_SH) to verify that it wrote successfully
Issues:
* When using a simple Redis lock (just GET, SET on a key) these operations are
not atomic and two competing threads will "get" the lock at the same time.
* Often, JSON writes will fail (sanity test in write_json when it re-reads
what was just written) - usually the final } at the end of the file is missing
or there is a duplicate } or some other issue.
* redis-py's lock() system seems not to work.
Here is some output when running with 2 processes:
1418324325.89 6569 Has the write lock!
1418324325.89 6568 Has the write lock!
1418324326.89 6568 Wrote the JSON
1418324326.89 6569 Wrote the JSON
1418324327.9 6568 Re-read the JSON: {
"hits": 3948
}
6568 is trying to drop the lock
1418324327.9 6569 Re-read the JSON: {
"hits": 3948
}
6569 is trying to drop the lock
Traceback (most recent call last):
File "json-stresstest.py", line 114, in <module>
main()
1418324327.9 6568 Has the write lock!
File "json-stresstest.py", line 19, in main
pool.map(job, task_generator())
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 558, in get
1418324327.9 6569 Has the write lock!
raise self._value
redis.exceptions.LockError: Cannot release an unlocked lock
As you can see, PID 6569 and 6968 both acquire the lock at the same time, write
the same JSON to disk, and then raise an exception when they release the lock
in the wrong order. Is this a bug in Redis, or redis-py? redis-py's lock uses
Lua scripts on the Redis server and the implementation of it matches up with
other implementations referred to from http://redis.io/topics/distlock
The expected output should be along these lines:
* PID 1 has the write lock!
* PID 1 wrote the JSON
* PID 1 Re-read the JSON: [intact json data here]
* PID 1 is trying to drop the lock
* PID 1 has dropped the lock
* PID 2 has the write lock!
* ... and so on ..."""
import multiprocessing
import json
import redis
import time
import os
import os.path
import codecs
from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
conn = redis.StrictRedis()
HITS_FILE = "hits.json"
def main():
# 10 process pool
pool = multiprocessing.Pool(2)
pool.map(job, task_generator())
pool.close()
pool.join()
def job(task):
db = read_json()
if not db:
db = dict(hits=0)
db["hits"] += 1
write_json(db)
def read_json():
if not os.path.isfile(HITS_FILE):
return None
fh = codecs.open(HITS_FILE, "r", "utf-8")
flock(fh, LOCK_SH)
text = fh.read()
flock(fh, LOCK_UN)
fh.close()
data = None
try:
data = json.loads(text)
except Exception as e:
print "Couldn't decode JSON data from {}".format(HITS_FILE)
print e
return data
def write_json(data):
"""Write JSON to disk."""
task_id = os.getpid()
# For added write security, use Redis to lock.
lock = redis_lock()
if not lock:
print "ERROR: Failed to acquire lock!"
return
print time.time(), task_id, "Has the write lock!"
time.sleep(1)
fh = codecs.open(HITS_FILE, "w", "utf-8")
flock(fh, LOCK_EX)
fh.write(json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')))
flock(fh, LOCK_UN)
fh.close()
print time.time(), task_id, "Wrote the JSON"
time.sleep(1)
# Verify the write went okay
fh = codecs.open(HITS_FILE, "r", "utf-8")
flock(fh, LOCK_SH)
text = fh.read()
flock(fh, LOCK_UN)
fh.close()
print time.time(), task_id, "Re-read the JSON:", text
redis_unlock(lock)
try:
json.loads(text)
except Exception as e:
print "JSON WRITE FAILED!", e
print text
def redis_lock(key=HITS_FILE, timeout=5, expire=20):
lock_key = key + "_lock"
return conn.lock(lock_key, timeout=expire)
# The "simpler" locking that isn't atomic and doesn't work :P
# begin = int(time.time())
# lock = conn.get(lock_key)
# while lock:
# time.sleep(0.2)
# lock = conn.get(lock_key)
# if int(time.time()) - begin >= timeout:
# print "Lock timed out!"
# return None
#
# # Take the lock.
# conn.set(lock_key, time.time())
# conn.expire(expire)
# return True
def redis_unlock(lock):
print os.getpid(), "is trying to drop the lock"
lock.release()
print os.getpid(), "has dropped the lock"
# Simpler one
# conn.delete(key + "_lock")
def task_generator():
num = 0
while num < 10000:
yield num
num += 1
return
if __name__ == "__main__":
main()
@kirsle

This comment has been minimized.

Copy link
Owner Author

kirsle commented Dec 11, 2014

redis-py's lock() implementation doesn't work, but using the basic setnx does:

def redis_lock(key=HITS_FILE, timeout=5, expire=20):
    lock_key = key + "_lock"

    begin = int(time.time())
    lock = conn.setnx(lock_key, time.time())
    while not lock:
        time.sleep(0.2)
        lock = conn.setnx(lock_key, time.time())
        if int(time.time()) - begin >= timeout:
            print "Lock timed out!"
            return None

    # Prevent stale locks
    conn.expire(lock_key, expire)

    return True


def redis_unlock(key=HITS_FILE):
    conn.delete(key + "_lock")
@kirsle

This comment has been minimized.

Copy link
Owner Author

kirsle commented Dec 11, 2014

redis-py's docs are unclear and imply that client.lock() will actually acquire the lock at that time. It doesn't.

lock = conn.lock(key_name)
lock.acquire() # <--- This actually acquires the lock

# do the needful

lock.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.