Skip to content

Instantly share code, notes, and snippets.

@otrack
Created October 3, 2018 10:05
Show Gist options
  • Save otrack/65825cde0ddf2c020b8d4600d48e8359 to your computer and use it in GitHub Desktop.
Save otrack/65825cde0ddf2c020b8d4600d48e8359 to your computer and use it in GitHub Desktop.
diff --git a/bin/image.sh b/bin/image.sh
index 7a8d7fb..40ee452 100755
--- a/bin/image.sh
+++ b/bin/image.sh
@@ -7,7 +7,7 @@ else
fi
DIR=$(dirname "$0")
-IMAGE=vitorenesduarte/vcd-java-client:${TAG}
+IMAGE=0track/vcd-java-client:${TAG}
DOCKERFILE=${DIR}/../Dockerfiles/vcd-java-client
# release vcd-java-client
diff --git a/src/main/java/org/imdea/vcd/DataRW.java b/src/main/java/org/imdea/vcd/DataRW.java
index 6b6eb55..ea9cf2a 100644
--- a/src/main/java/org/imdea/vcd/DataRW.java
+++ b/src/main/java/org/imdea/vcd/DataRW.java
@@ -6,12 +6,21 @@ import com.codahale.metrics.MetricAttribute;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.protobuf.ByteString;
+import org.imdea.vcd.pb.Proto.Commit;
import org.imdea.vcd.pb.Proto.Message;
import org.imdea.vcd.pb.Proto.MessageSet;
import org.imdea.vcd.pb.Proto.Reply;
-import org.imdea.vcd.queue.box.CommittedQueueBox;
+import org.imdea.vcd.queue.ConfQueue;
import org.imdea.vcd.queue.DepQueue;
+import org.imdea.vcd.queue.Queue;
+import org.imdea.vcd.queue.QueueAddArgs;
+import org.imdea.vcd.queue.QueueType;
+import org.imdea.vcd.queue.RandomQueue;
+import org.imdea.vcd.queue.box.CommittedQueueBox;
import org.imdea.vcd.queue.clock.Clock;
+import org.imdea.vcd.queue.clock.Dot;
+import org.imdea.vcd.queue.clock.ExceptionSet;
+import org.imdea.vcd.queue.clock.MaxInt;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -22,19 +31,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.imdea.vcd.pb.Proto.Commit;
-import org.imdea.vcd.queue.ConfQueue;
-import org.imdea.vcd.queue.Queue;
-import org.imdea.vcd.queue.QueueAddArgs;
-import org.imdea.vcd.queue.QueueType;
-import org.imdea.vcd.queue.RandomQueue;
-import org.imdea.vcd.queue.clock.Dot;
-import org.imdea.vcd.queue.clock.ExceptionSet;
-import org.imdea.vcd.queue.clock.MaxInt;
/**
*
@@ -68,6 +69,7 @@ public class DataRW {
private final LinkedBlockingQueue<MessageSet> toWriter;
private final LinkedBlockingQueue<Optional<MessageSet>> toClient;
+ private final Set<ByteString> chains;
private final Writer writer;
private final SocketReader socketReader;
@@ -77,8 +79,9 @@ public class DataRW {
this.batchWait = config.getBatchWait();
this.toWriter = new LinkedBlockingQueue<>();
this.toClient = new LinkedBlockingQueue<>();
+ this.chains = ConcurrentHashMap.newKeySet();
this.writer = new Writer(this.out, this.toWriter, this.batchWait);
- this.socketReader = new SocketReader(this.in, this.toClient, config);
+ this.socketReader = new SocketReader(this.in, this.toClient, this.chains, config);
}
public void start() {
@@ -111,6 +114,20 @@ public class DataRW {
if (this.batchWait > 0) {
toWriter.put(messageSet);
} else {
+ boolean canWrite;
+ do {
+ canWrite = true;
+ mainLoop:
+ for (Message message : messageSet.getMessagesList()) {
+ for (ByteString hash : message.getHashesList()) {
+ if (chains.contains(hash)){
+ canWrite = false;
+ break mainLoop;
+ }
+ }
+ }
+ if (!canWrite) Thread.sleep(1);
+ } while (!canWrite);
doWrite(messageSet, this.out);
}
}
@@ -208,12 +225,12 @@ public class DataRW {
private final LinkedBlockingQueue<Reply> toDeliverer;
private final Deliverer deliverer;
- public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Config config) {
+ public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains, Config config) {
this.in = in;
this.toClient = toClient;
this.batchWait = config.getBatchWait();
this.toDeliverer = new LinkedBlockingQueue<>();
- this.deliverer = new Deliverer(this.toClient, this.toDeliverer, config);
+ this.deliverer = new Deliverer(this.toClient, chains, this.toDeliverer, config);
}
public void close() {
@@ -281,6 +298,7 @@ public class DataRW {
private final Sorter sorter;
private final QueueType queueType;
private Queue<CommittedQueueBox> queue;
+ private final Set<ByteString> chains;
// metrics
private final Timer toAdd;
@@ -289,7 +307,7 @@ public class DataRW {
private final Histogram queueSize;
private final Histogram queueElements;
- public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, LinkedBlockingQueue<Reply> toDeliverer, Config config) {
+ public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains,LinkedBlockingQueue<Reply> toDeliverer, Config config) {
createBox = METRICS.timer(MetricRegistry.name(DataRW.class, "createBox"));
toAdd = METRICS.timer(MetricRegistry.name(DataRW.class, "toAdd"));
@@ -302,6 +320,7 @@ public class DataRW {
this.toSorter = new LinkedBlockingQueue<>();
this.sorter = new Sorter(toClient, this.toSorter, config);
this.queueType = config.getQueueType();
+ this.chains = chains;
}
public void close() {
@@ -354,7 +373,14 @@ public class DataRW {
queueElements.update(queue.elements());
if (!toDeliver.isEmpty()) {
+ for (Message message : queue.first().getMessages()) {
+ chains.removeAll(message.getHashesList());
+ }
toSorter.put(toDeliver);
+ }else if (queue.size() > 0) {
+ for (Message message : queue.first().getMessages()) {
+ chains.addAll(message.getHashesList());
+ }
}
break;
default:
diff --git a/src/main/java/org/imdea/vcd/queue/ConfQueue.java b/src/main/java/org/imdea/vcd/queue/ConfQueue.java
index 8a0d880..3f907fa 100644
--- a/src/main/java/org/imdea/vcd/queue/ConfQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/ConfQueue.java
@@ -1,19 +1,19 @@
package org.imdea.vcd.queue;
+import org.imdea.vcd.queue.box.QueueBox;
+import org.imdea.vcd.queue.clock.Clock;
+import org.imdea.vcd.queue.clock.Dot;
+import org.imdea.vcd.queue.clock.Dots;
+import org.imdea.vcd.queue.clock.ExceptionSet;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.imdea.vcd.queue.box.QueueBox;
-import org.imdea.vcd.queue.clock.Clock;
-import org.imdea.vcd.queue.clock.Dot;
-import org.imdea.vcd.queue.clock.Dots;
-import org.imdea.vcd.queue.clock.ExceptionSet;
/**
*
@@ -26,6 +26,7 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
private final HashMap<Dot, E> dotToBox = new HashMap<>();
private final HashMap<Dot, Clock<ExceptionSet>> dotToConf = new HashMap<>();
private List<E> toDeliver = new ArrayList<>();
+ E first;
private final Clock<ExceptionSet> committed;
private final Clock<ExceptionSet> delivered;
@@ -139,6 +140,11 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
throw new UnsupportedOperationException("Method not supported.");
}
+ @Override
+ public E first() {
+ return first;
+ }
+
@Override
public int size() {
return this.dotToConf.size();
@@ -183,6 +189,8 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
// get conf
Clock<ExceptionSet> conf = dotToConf.get(at);
+ first = dotToBox.get(at);
+
// if not all deps are committed, give up
boolean allDepsCommitted = conf.subtractIsBottom(committed);
if (!allDepsCommitted) {
diff --git a/src/main/java/org/imdea/vcd/queue/DepQueue.java b/src/main/java/org/imdea/vcd/queue/DepQueue.java
index b9c34aa..f2d308b 100644
--- a/src/main/java/org/imdea/vcd/queue/DepQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/DepQueue.java
@@ -133,6 +133,11 @@ public class DepQueue<E extends QueueBox> implements Queue<E> {
return result;
}
+ @Override
+ public E first() {
+ return first.item;
+ }
+
/**
* Find element that depends on e.
*/
diff --git a/src/main/java/org/imdea/vcd/queue/Queue.java b/src/main/java/org/imdea/vcd/queue/Queue.java
index 0eba674..8f17b9c 100644
--- a/src/main/java/org/imdea/vcd/queue/Queue.java
+++ b/src/main/java/org/imdea/vcd/queue/Queue.java
@@ -18,6 +18,8 @@ public interface Queue<E extends QueueBox> {
List<E> toList();
+ E first();
+
int size();
int elements();
diff --git a/src/main/java/org/imdea/vcd/queue/RandomQueue.java b/src/main/java/org/imdea/vcd/queue/RandomQueue.java
index 4a1499f..cf108c7 100644
--- a/src/main/java/org/imdea/vcd/queue/RandomQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/RandomQueue.java
@@ -1,11 +1,12 @@
package org.imdea.vcd.queue;
+import org.imdea.vcd.queue.box.QueueBox;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import org.imdea.vcd.queue.box.QueueBox;
/**
*
@@ -68,6 +69,11 @@ public class RandomQueue<E extends QueueBox> implements Queue<E> {
throw new UnsupportedOperationException("Method not supported.");
}
+ @Override
+ public E first() {
+ return this.toList().get(0);
+ }
+
@Override
public int size() {
return size;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment