Skip to content

Instantly share code, notes, and snippets.

@otrack
Created October 3, 2018 10:04
Show Gist options
  • Save otrack/6616519e6b26a3795c8cbe66e467882c to your computer and use it in GitHub Desktop.
Save otrack/6616519e6b26a3795c8cbe66e467882c to your computer and use it in GitHub Desktop.
diff --git a/bin/image.sh b/bin/image.sh
index 40ee452..7a8d7fb 100755
--- a/bin/image.sh
+++ b/bin/image.sh
@@ -7,7 +7,7 @@ else
fi
DIR=$(dirname "$0")
-IMAGE=0track/vcd-java-client:${TAG}
+IMAGE=vitorenesduarte/vcd-java-client:${TAG}
DOCKERFILE=${DIR}/../Dockerfiles/vcd-java-client
# release vcd-java-client
diff --git a/src/main/java/org/imdea/vcd/DataRW.java b/src/main/java/org/imdea/vcd/DataRW.java
index ea9cf2a..6b6eb55 100644
--- a/src/main/java/org/imdea/vcd/DataRW.java
+++ b/src/main/java/org/imdea/vcd/DataRW.java
@@ -6,21 +6,12 @@ import com.codahale.metrics.MetricAttribute;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.protobuf.ByteString;
-import org.imdea.vcd.pb.Proto.Commit;
import org.imdea.vcd.pb.Proto.Message;
import org.imdea.vcd.pb.Proto.MessageSet;
import org.imdea.vcd.pb.Proto.Reply;
-import org.imdea.vcd.queue.ConfQueue;
-import org.imdea.vcd.queue.DepQueue;
-import org.imdea.vcd.queue.Queue;
-import org.imdea.vcd.queue.QueueAddArgs;
-import org.imdea.vcd.queue.QueueType;
-import org.imdea.vcd.queue.RandomQueue;
import org.imdea.vcd.queue.box.CommittedQueueBox;
+import org.imdea.vcd.queue.DepQueue;
import org.imdea.vcd.queue.clock.Clock;
-import org.imdea.vcd.queue.clock.Dot;
-import org.imdea.vcd.queue.clock.ExceptionSet;
-import org.imdea.vcd.queue.clock.MaxInt;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -31,11 +22,19 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.imdea.vcd.pb.Proto.Commit;
+import org.imdea.vcd.queue.ConfQueue;
+import org.imdea.vcd.queue.Queue;
+import org.imdea.vcd.queue.QueueAddArgs;
+import org.imdea.vcd.queue.QueueType;
+import org.imdea.vcd.queue.RandomQueue;
+import org.imdea.vcd.queue.clock.Dot;
+import org.imdea.vcd.queue.clock.ExceptionSet;
+import org.imdea.vcd.queue.clock.MaxInt;
/**
*
@@ -69,7 +68,6 @@ public class DataRW {
private final LinkedBlockingQueue<MessageSet> toWriter;
private final LinkedBlockingQueue<Optional<MessageSet>> toClient;
- private final Set<ByteString> chains;
private final Writer writer;
private final SocketReader socketReader;
@@ -79,9 +77,8 @@ public class DataRW {
this.batchWait = config.getBatchWait();
this.toWriter = new LinkedBlockingQueue<>();
this.toClient = new LinkedBlockingQueue<>();
- this.chains = ConcurrentHashMap.newKeySet();
this.writer = new Writer(this.out, this.toWriter, this.batchWait);
- this.socketReader = new SocketReader(this.in, this.toClient, this.chains, config);
+ this.socketReader = new SocketReader(this.in, this.toClient, config);
}
public void start() {
@@ -114,20 +111,6 @@ public class DataRW {
if (this.batchWait > 0) {
toWriter.put(messageSet);
} else {
- boolean canWrite;
- do {
- canWrite = true;
- mainLoop:
- for (Message message : messageSet.getMessagesList()) {
- for (ByteString hash : message.getHashesList()) {
- if (chains.contains(hash)){
- canWrite = false;
- break mainLoop;
- }
- }
- }
- if (!canWrite) Thread.sleep(1);
- } while (!canWrite);
doWrite(messageSet, this.out);
}
}
@@ -225,12 +208,12 @@ public class DataRW {
private final LinkedBlockingQueue<Reply> toDeliverer;
private final Deliverer deliverer;
- public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains, Config config) {
+ public SocketReader(DataInputStream in, LinkedBlockingQueue<Optional<MessageSet>> toClient, Config config) {
this.in = in;
this.toClient = toClient;
this.batchWait = config.getBatchWait();
this.toDeliverer = new LinkedBlockingQueue<>();
- this.deliverer = new Deliverer(this.toClient, chains, this.toDeliverer, config);
+ this.deliverer = new Deliverer(this.toClient, this.toDeliverer, config);
}
public void close() {
@@ -298,7 +281,6 @@ public class DataRW {
private final Sorter sorter;
private final QueueType queueType;
private Queue<CommittedQueueBox> queue;
- private final Set<ByteString> chains;
// metrics
private final Timer toAdd;
@@ -307,7 +289,7 @@ public class DataRW {
private final Histogram queueSize;
private final Histogram queueElements;
- public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, Set<ByteString> chains,LinkedBlockingQueue<Reply> toDeliverer, Config config) {
+ public Deliverer(LinkedBlockingQueue<Optional<MessageSet>> toClient, LinkedBlockingQueue<Reply> toDeliverer, Config config) {
createBox = METRICS.timer(MetricRegistry.name(DataRW.class, "createBox"));
toAdd = METRICS.timer(MetricRegistry.name(DataRW.class, "toAdd"));
@@ -320,7 +302,6 @@ public class DataRW {
this.toSorter = new LinkedBlockingQueue<>();
this.sorter = new Sorter(toClient, this.toSorter, config);
this.queueType = config.getQueueType();
- this.chains = chains;
}
public void close() {
@@ -373,14 +354,7 @@ public class DataRW {
queueElements.update(queue.elements());
if (!toDeliver.isEmpty()) {
- for (Message message : queue.first().getMessages()) {
- chains.removeAll(message.getHashesList());
- }
toSorter.put(toDeliver);
- }else if (queue.size() > 0) {
- for (Message message : queue.first().getMessages()) {
- chains.addAll(message.getHashesList());
- }
}
break;
default:
diff --git a/src/main/java/org/imdea/vcd/queue/ConfQueue.java b/src/main/java/org/imdea/vcd/queue/ConfQueue.java
index 3f907fa..8a0d880 100644
--- a/src/main/java/org/imdea/vcd/queue/ConfQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/ConfQueue.java
@@ -1,19 +1,19 @@
package org.imdea.vcd.queue;
-import org.imdea.vcd.queue.box.QueueBox;
-import org.imdea.vcd.queue.clock.Clock;
-import org.imdea.vcd.queue.clock.Dot;
-import org.imdea.vcd.queue.clock.Dots;
-import org.imdea.vcd.queue.clock.ExceptionSet;
-
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.imdea.vcd.queue.box.QueueBox;
+import org.imdea.vcd.queue.clock.Clock;
+import org.imdea.vcd.queue.clock.Dot;
+import org.imdea.vcd.queue.clock.Dots;
+import org.imdea.vcd.queue.clock.ExceptionSet;
/**
*
@@ -26,7 +26,6 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
private final HashMap<Dot, E> dotToBox = new HashMap<>();
private final HashMap<Dot, Clock<ExceptionSet>> dotToConf = new HashMap<>();
private List<E> toDeliver = new ArrayList<>();
- E first;
private final Clock<ExceptionSet> committed;
private final Clock<ExceptionSet> delivered;
@@ -140,11 +139,6 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
throw new UnsupportedOperationException("Method not supported.");
}
- @Override
- public E first() {
- return first;
- }
-
@Override
public int size() {
return this.dotToConf.size();
@@ -189,8 +183,6 @@ public class ConfQueue<E extends QueueBox> implements Queue<E> {
// get conf
Clock<ExceptionSet> conf = dotToConf.get(at);
- first = dotToBox.get(at);
-
// if not all deps are committed, give up
boolean allDepsCommitted = conf.subtractIsBottom(committed);
if (!allDepsCommitted) {
diff --git a/src/main/java/org/imdea/vcd/queue/DepQueue.java b/src/main/java/org/imdea/vcd/queue/DepQueue.java
index f2d308b..b9c34aa 100644
--- a/src/main/java/org/imdea/vcd/queue/DepQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/DepQueue.java
@@ -133,11 +133,6 @@ public class DepQueue<E extends QueueBox> implements Queue<E> {
return result;
}
- @Override
- public E first() {
- return first.item;
- }
-
/**
* Find element that depends on e.
*/
diff --git a/src/main/java/org/imdea/vcd/queue/Queue.java b/src/main/java/org/imdea/vcd/queue/Queue.java
index 8f17b9c..0eba674 100644
--- a/src/main/java/org/imdea/vcd/queue/Queue.java
+++ b/src/main/java/org/imdea/vcd/queue/Queue.java
@@ -18,8 +18,6 @@ public interface Queue<E extends QueueBox> {
List<E> toList();
- E first();
-
int size();
int elements();
diff --git a/src/main/java/org/imdea/vcd/queue/RandomQueue.java b/src/main/java/org/imdea/vcd/queue/RandomQueue.java
index cf108c7..4a1499f 100644
--- a/src/main/java/org/imdea/vcd/queue/RandomQueue.java
+++ b/src/main/java/org/imdea/vcd/queue/RandomQueue.java
@@ -1,12 +1,11 @@
package org.imdea.vcd.queue;
-import org.imdea.vcd.queue.box.QueueBox;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import org.imdea.vcd.queue.box.QueueBox;
/**
*
@@ -69,11 +68,6 @@ public class RandomQueue<E extends QueueBox> implements Queue<E> {
throw new UnsupportedOperationException("Method not supported.");
}
- @Override
- public E first() {
- return this.toList().get(0);
- }
-
@Override
public int size() {
return size;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment