Skip to content

Instantly share code, notes, and snippets.

@stain
Last active October 9, 2019 00:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stain/3ebcada5a641e3e0a81096bd2c8ac1b5 to your computer and use it in GitHub Desktop.
Save stain/3ebcada5a641e3e0a81096bd2c8ac1b5 to your computer and use it in GitHub Desktop.
A self-locking queue of locks - DO NOT USE https://twitter.com/soilandreyes/status/1181587178121502720
/*
* Copyright 2019 Stian Soiland-Reyes
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lockqueue;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
/**
* A Queue of Locks
* <p>
* This queue can hold a collection of {@link Lock}s ready for future use.
* <p>
* The queue is implemented as a primitive linked list intended to be
* modified by multiple threads concurrently,
* utilizing the queued locks for its internal synchronization.
* <p>
* This implementation has particularly low performance and may
* give your stack some exercise. It is also fairly
* buggy, almost guaranteeing a deadlock.
*
*/
public class LockQueue extends AbstractQueue<Lock> implements Queue<Lock> {
private final Lock lock;
private final int maxCapacity;
private volatile LockQueue next;
/**
* Construct an empty, unbounded lock queue.
*
* @param lock Lock to use for locking this queue.
*/
public LockQueue(Lock lock) {
this(lock, null, Integer.MAX_VALUE);
}
/**
* Construct an empty, bounded lock queue.
*
* @param lock Lock to use for locking this queue.
* @param maxCapacity Maximum elements in queue
*/
public LockQueue(Lock lock, int maxCapacity) {
this(lock, null, Integer.MAX_VALUE);
}
private LockQueue(Lock lock, LockQueue next, int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalStateException("negative maxCapacity");
}
this.lock = Objects.requireNonNull(lock);
this.next = next;
this.maxCapacity = maxCapacity;
}
@Override
public boolean offer(Lock e) {
if (!e.tryLock()) {
throw new IllegalArgumentException("Cannot acquire inserted lock: " + e);
}
lock.lock();
try {
if (maxCapacity < 1) {
throw new IllegalStateException("Maximum queue capacity reached");
}
if (next == null || next == this) {
next = new LockQueue(e, maxCapacity - 1);
} else {
LockQueue tail = findTail();
tail.offer(e);
}
return true;
} finally {
e.unlock();
lock.unlock();
}
}
private LockQueue findTail() {
LockQueue current = this;
current.lock.lock();
while (true) {
Lock currentLock = current.lock;
try {
if (current.next == null) {
return current;
}
current.next.lock.lock(); // Lock it before next iteration in loop
current = current.next;
} finally {
currentLock.unlock();
}
}
}
@Override
public Lock poll() {
lock.lock();
try {
if (next == null) {
return null; // empty (we won't give away OUR lock)
}
Lock e = next.lock; // ..but this one
e.lock();
try {
next = next.next;
} finally {
e.unlock();
}
return e;
} finally {
lock.unlock();
}
}
@Override
public Lock peek() {
lock.lock();
try {
if (next == null) {
return null;
}
Lock e = next.lock;
return e;
} finally {
lock.unlock();
}
}
@Override
public Iterator<Lock> iterator() {
return new Iterator<Lock>() {
private volatile LockQueue current = LockQueue.this;
@Override
public boolean hasNext() {
return current != null && ! current.isEmpty();
}
@Override
public Lock next() {
Lock mylock = current.element();
mylock.lock(); // ..it was returned unlocked
try {
current = current.next;
return mylock;
} finally {
mylock.unlock();
}
}
};
}
@Override
public int size() {
lock.lock();
try {
if (next == null) {
return 0;
} else {
Iterator<Lock> it = iterator();
int size=0;
while(it.hasNext()) {
size++;
it.next();
}
return size;
}
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
lock.lock();
try {
return next == null;
} finally {
lock.unlock();
}
}
}
/*
* Copyright 2019 Stian Soiland-Reyes
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lockqueue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.Test;
public class TestLockQueue {
@Test
public void singleAddAndRemove() throws Exception {
LockQueue queue = new LockQueue(new ReentrantLock());
assertTrue(queue.isEmpty());
assertNull(queue.peek());
assertEquals(0, queue.size());
ReentrantLock l = new ReentrantLock();
queue.add(l);
assertFalse(queue.isEmpty());
assertEquals(1, queue.size());
assertSame(l, queue.element());
assertSame(l, queue.element());
assertSame(l, queue.poll());
assertTrue(queue.isEmpty());
}
@Test
public void sequentialMany() throws Exception {
LockQueue queue = new LockQueue(new ReentrantLock());
assertFalse(queue.iterator().hasNext());
List<Lock> expectedLocks = new ArrayList<>();
for (int i=0;i<10;i++) {
ReentrantLock lock = new ReentrantLock();
expectedLocks.add(lock);
queue.offer(lock);
}
assertTrue(queue.iterator().hasNext());
assertEquals(10, queue.size());
assertArrayEquals(expectedLocks.toArray(), queue.toArray());
assertEquals(expectedLocks.get(0), queue.peek());
for (int i=0;i<10;i++) { // ensure FIFO, oldest first
assertEquals(expectedLocks.get(i), queue.poll());
}
assertEquals(0, queue.size());
}
@Test
public void threadedMany() throws Exception {
List<Thread> threads = new ArrayList<Thread>();
LockQueue queue = new LockQueue(new ReentrantLock());
final int MAX_THREADS = 8;
final int LOCKS_PER_THREAD = 5000;
for (int i=0; i<MAX_THREADS; i++) {
Thread t = new Thread() {
@Override
public void run() {
for (int j=0; j<LOCKS_PER_THREAD; j++) {
queue.add(new ReentrantLock());
if (j % 7 == 0) {
// now and then shuffle around a bit
Lock l = queue.poll();
queue.offer(l);
}
}
}
};
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join();
}
assertEquals(MAX_THREADS*LOCKS_PER_THREAD, queue.size());
queue.clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment