Skip to content

Instantly share code, notes, and snippets.

@brfrn169
Created December 21, 2015 09:59
Show Gist options
  • Save brfrn169/15a874594be2fb9d6ea0 to your computer and use it in GitHub Desktop.
Save brfrn169/15a874594be2fb9d6ea0 to your computer and use it in GitHub Desktop.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index 96af2c3..1e18c32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -19,12 +19,15 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Collections;
import java.util.LinkedList;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
* Manages the read/write consistency within memstore. This provides
@@ -38,6 +41,8 @@ public class MultiVersionConsistencyControl {
private volatile long memstoreRead = 0;
private final Object readWaiters = new Object();
+ private final Set<WriteEntry> advancedNoWriteNumberWriteEntries = Collections.newSetFromMap(new ConcurrentHashMap<WriteEntry, Boolean>());
+
// This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>();
@@ -158,6 +163,10 @@ public class MultiVersionConsistencyControl {
// Using Max because Edit complete in WAL sync order not arriving order
nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
writeQueue.removeFirst();
+
+ if (queueFirst.getWriteNumber() == NO_WRITE_NUMBER) {
+ advancedNoWriteNumberWriteEntries.add(queueFirst);
+ }
} else {
break;
}
@@ -166,12 +175,9 @@ public class MultiVersionConsistencyControl {
if (nextReadValue > memstoreRead) {
memstoreRead = nextReadValue;
}
-
- // notify waiters on writeQueue before return
- writeQueue.notifyAll();
}
-
- if (nextReadValue > 0) {
+
+ if (nextReadValue >= 0) {
synchronized (readWaiters) {
readWaiters.notifyAll();
}
@@ -204,40 +210,29 @@ public class MultiVersionConsistencyControl {
}
public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
- boolean interrupted = false;
- WriteEntry w = waitedEntry;
+ advanceMemstore(waitedEntry);
- try {
- WriteEntry firstEntry = null;
- do {
- synchronized (writeQueue) {
- // writeQueue won't be empty at this point, the following is just a safety check
- if (writeQueue.isEmpty()) {
- break;
- }
- firstEntry = writeQueue.getFirst();
- if (firstEntry == w) {
- // all previous in-flight transactions are done
- break;
+ boolean interrupted = false;
+ synchronized (readWaiters) {
+ try {
+ if (waitedEntry.getWriteNumber() == NO_WRITE_NUMBER) {
+ while (!advancedNoWriteNumberWriteEntries.contains(waitedEntry)) {
+ readWaiters.wait(0);
}
- try {
- writeQueue.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
- break;
+ advancedNoWriteNumberWriteEntries.remove(waitedEntry);
+ } else {
+ while (memstoreRead < waitedEntry.getWriteNumber()) {
+ readWaiters.wait(0);
}
}
- } while (firstEntry != null);
- } finally {
- if (w != null) {
- advanceMemstore(w);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
}
}
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
+
+ if (interrupted) Thread.currentThread().interrupt();
}
public long memstoreReadPoint() {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment