-
-
Save otrack/65825cde0ddf2c020b8d4600d48e8359 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/bin/image.sh b/bin/image.sh | |
index 7a8d7fb..40ee452 100755 | |
--- a/bin/image.sh | |
+++ b/bin/image.sh | |
@@ -7,7 +7,7 @@ else | |
fi | |
DIR=$(dirname "$0") | |
-IMAGE=vitorenesduarte/vcd-java-client:${TAG} | |
+IMAGE=0track/vcd-java-client:${TAG} | |
DOCKERFILE=${DIR}/../Dockerfiles/vcd-java-client | |
# release vcd-java-client | |
diff --git a/src/main/java/org/imdea/vcd/DataRW.java b/src/main/java/org/imdea/vcd/DataRW.java | |
index 6b6eb55..ea9cf2a 100644 | |
--- a/src/main/java/org/imdea/vcd/DataRW.java | |
+++ b/src/main/java/org/imdea/vcd/DataRW.java | |
@@ -6,12 +6,21 @@ import com.codahale.metrics.MetricAttribute; | |
import com.codahale.metrics.MetricRegistry; | |
import com.codahale.metrics.Timer; | |
import com.google.protobuf.ByteString; | |
+import org.imdea.vcd.pb.Proto.Commit; | |
import org.imdea.vcd.pb.Proto.Message; | |
import org.imdea.vcd.pb.Proto.MessageSet; | |
import org.imdea.vcd.pb.Proto.Reply; | |
-import org.imdea.vcd.queue.box.CommittedQueueBox; | |
+import org.imdea.vcd.queue.ConfQueue; | |
import org.imdea.vcd.queue.DepQueue; | |
+import org.imdea.vcd.queue.Queue; | |
+import org.imdea.vcd.queue.QueueAddArgs; | |
+import org.imdea.vcd.queue.QueueType; | |
+import org.imdea.vcd.queue.RandomQueue; | |
+import org.imdea.vcd.queue.box.CommittedQueueBox; | |
import org.imdea.vcd.queue.clock.Clock; | |
+import org.imdea.vcd.queue.clock.Dot; | |
+import org.imdea.vcd.queue.clock.ExceptionSet; | |
+import org.imdea.vcd.queue.clock.MaxInt; | |
import java.io.DataInputStream; | |
import java.io.DataOutputStream; | |
@@ -22,19 +31,11 @@ import java.util.HashSet; | |
import java.util.List; | |
import java.util.Optional; | |
import java.util.Set; | |
+import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
-import org.imdea.vcd.pb.Proto.Commit; | |
-import org.imdea.vcd.queue.ConfQueue; | |
-import org.imdea.vcd.queue.Queue; | |
-import org.imdea.vcd.queue.QueueAddArgs; | |
-import org.imdea.vcd.queue.QueueType; | |
-import org.imdea.vcd.queue.RandomQueue; | |
-import org.imdea.vcd.queue.clock.Dot; | |
-import org.imdea.vcd.queue.clock.ExceptionSet; | |
-import org.imdea.vcd.queue.clock.MaxInt; | |
/** | |
* | |
@@ -68,6 +69,7 @@ public class DataRW { | |
private final LinkedBlockingQueue<MessageSet> toWriter; | |
private final LinkedBlockingQueue<Optional<MessageSet>> toClient; | |
+ private final Set<ByteString> chains; | |
private final Writer writer; | |
private final SocketReader socketReader; | |
@@ -77,8 +79,9 @@ public class DataRW { | |
this.batchWait = config.getBatchWait(); | |
this.toWriter = new LinkedBlockingQueue<>(); | |
this.toClient = new LinkedBlockingQueue<>(); | |
+ this.chains = ConcurrentHashMap.newKeySet(); | |
this.writer = new Writer(this.out, this.toWriter, this.batchWait); | |
- this.socketReader = new SocketReader(this.in, this.toClient, config); | |
+ this.socketReader = new SocketReader(this.in, this.toClient, this.chains, config); | |
} | |
public void start() { | |
@@ -111,6 +114,20 @@ public class DataRW { | |
if (this.batchWait > 0) { | |
toWriter.put(messageSet); | |
} else { | |
+ boolean canWrite; | |
+ do { | |
+ canWrite = true; | |
+ mainLoop: | |
+ for (Message message : messageSet.getMessagesList()) { | |
+ for (ByteString hash : message.getHashesList()) { | |
+ if (chains.contains(hash)){ | |
+ canWrite = false; | |
+ break mainLoop; | |
+ } | |
+ } | |
+ } | |
+ if (!canWrite) Thread.sleep(1); | |
+ } while (!canWrite); | |
doWrite(messageSet, this.out); | |
} | |
} | |
@@ -208,12 +225,12 @@ public class DataRW { | |
private final LinkedBlockingQueue<Reply> toDeliverer; | |
private final Deliverer deliverer; | |
- public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Config config) { | |
+ public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains, Config config) { | |
this.in = in; | |
this.toClient = toClient; | |
this.batchWait = config.getBatchWait(); | |
this.toDeliverer = new LinkedBlockingQueue<>(); | |
- this.deliverer = new Deliverer(this.toClient, this.toDeliverer, config); | |
+ this.deliverer = new Deliverer(this.toClient, chains, this.toDeliverer, config); | |
} | |
public void close() { | |
@@ -281,6 +298,7 @@ public class DataRW { | |
private final Sorter sorter; | |
private final QueueType queueType; | |
private Queue<CommittedQueueBox> queue; | |
+ private final Set<ByteString> chains; | |
// metrics | |
private final Timer toAdd; | |
@@ -289,7 +307,7 @@ public class DataRW { | |
private final Histogram queueSize; | |
private final Histogram queueElements; | |
- public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, LinkedBlockingQueue<Reply> toDeliverer, Config config) { | |
+ public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains,LinkedBlockingQueue<Reply> toDeliverer, Config config) { | |
createBox = METRICS.timer(MetricRegistry.name(DataRW.class, "createBox")); | |
toAdd = METRICS.timer(MetricRegistry.name(DataRW.class, "toAdd")); | |
@@ -302,6 +320,7 @@ public class DataRW { | |
this.toSorter = new LinkedBlockingQueue<>(); | |
this.sorter = new Sorter(toClient, this.toSorter, config); | |
this.queueType = config.getQueueType(); | |
+ this.chains = chains; | |
} | |
public void close() { | |
@@ -354,7 +373,14 @@ public class DataRW { | |
queueElements.update(queue.elements()); | |
if (!toDeliver.isEmpty()) { | |
+ for (Message message : queue.first().getMessages()) { | |
+ chains.removeAll(message.getHashesList()); | |
+ } | |
toSorter.put(toDeliver); | |
+ }else if (queue.size() > 0) { | |
+ for (Message message : queue.first().getMessages()) { | |
+ chains.addAll(message.getHashesList()); | |
+ } | |
} | |
break; | |
default: | |
diff --git a/src/main/java/org/imdea/vcd/queue/ConfQueue.java b/src/main/java/org/imdea/vcd/queue/ConfQueue.java | |
index 8a0d880..3f907fa 100644 | |
--- a/src/main/java/org/imdea/vcd/queue/ConfQueue.java | |
+++ b/src/main/java/org/imdea/vcd/queue/ConfQueue.java | |
@@ -1,19 +1,19 @@ | |
package org.imdea.vcd.queue; | |
+import org.imdea.vcd.queue.box.QueueBox; | |
+import org.imdea.vcd.queue.clock.Clock; | |
+import org.imdea.vcd.queue.clock.Dot; | |
+import org.imdea.vcd.queue.clock.Dots; | |
+import org.imdea.vcd.queue.clock.ExceptionSet; | |
+ | |
import java.util.ArrayDeque; | |
import java.util.ArrayList; | |
import java.util.Deque; | |
import java.util.HashMap; | |
-import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
-import org.imdea.vcd.queue.box.QueueBox; | |
-import org.imdea.vcd.queue.clock.Clock; | |
-import org.imdea.vcd.queue.clock.Dot; | |
-import org.imdea.vcd.queue.clock.Dots; | |
-import org.imdea.vcd.queue.clock.ExceptionSet; | |
/** | |
* | |
@@ -26,6 +26,7 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> { | |
private final HashMap<Dot, E> dotToBox = new HashMap<>(); | |
private final HashMap<Dot, Clock<ExceptionSet>> dotToConf = new HashMap<>(); | |
private List<E> toDeliver = new ArrayList<>(); | |
+ E first; | |
private final Clock<ExceptionSet> committed; | |
private final Clock<ExceptionSet> delivered; | |
@@ -139,6 +140,11 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> { | |
throw new UnsupportedOperationException("Method not supported."); | |
} | |
+ @Override | |
+ public E first() { | |
+ return first; | |
+ } | |
+ | |
@Override | |
public int size() { | |
return this.dotToConf.size(); | |
@@ -183,6 +189,8 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> { | |
// get conf | |
Clock<ExceptionSet> conf = dotToConf.get(at); | |
+ first = dotToBox.get(at); | |
+ | |
// if not all deps are committed, give up | |
boolean allDepsCommitted = conf.subtractIsBottom(committed); | |
if (!allDepsCommitted) { | |
diff --git a/src/main/java/org/imdea/vcd/queue/DepQueue.java b/src/main/java/org/imdea/vcd/queue/DepQueue.java | |
index b9c34aa..f2d308b 100644 | |
--- a/src/main/java/org/imdea/vcd/queue/DepQueue.java | |
+++ b/src/main/java/org/imdea/vcd/queue/DepQueue.java | |
@@ -133,6 +133,11 @@ public class DepQueue<E extends QueueBox> implements Queue<E> { | |
return result; | |
} | |
+ @Override | |
+ public E first() { | |
+ return first.item; | |
+ } | |
+ | |
/** | |
* Find element that depends on e. | |
*/ | |
diff --git a/src/main/java/org/imdea/vcd/queue/Queue.java b/src/main/java/org/imdea/vcd/queue/Queue.java | |
index 0eba674..8f17b9c 100644 | |
--- a/src/main/java/org/imdea/vcd/queue/Queue.java | |
+++ b/src/main/java/org/imdea/vcd/queue/Queue.java | |
@@ -18,6 +18,8 @@ public interface Queue<E extends QueueBox> { | |
List<E> toList(); | |
+ E first(); | |
+ | |
int size(); | |
int elements(); | |
diff --git a/src/main/java/org/imdea/vcd/queue/RandomQueue.java b/src/main/java/org/imdea/vcd/queue/RandomQueue.java | |
index 4a1499f..cf108c7 100644 | |
--- a/src/main/java/org/imdea/vcd/queue/RandomQueue.java | |
+++ b/src/main/java/org/imdea/vcd/queue/RandomQueue.java | |
@@ -1,11 +1,12 @@ | |
package org.imdea.vcd.queue; | |
+import org.imdea.vcd.queue.box.QueueBox; | |
+ | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ThreadLocalRandom; | |
-import org.imdea.vcd.queue.box.QueueBox; | |
/** | |
* | |
@@ -68,6 +69,11 @@ public class RandomQueue<E extends QueueBox> implements Queue<E> { | |
throw new UnsupportedOperationException("Method not supported."); | |
} | |
+ @Override | |
+ public E first() { | |
+ return this.toList().get(0); | |
+ } | |
+ | |
@Override | |
public int size() { | |
return size; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment