Created
January 9, 2017 08:28
-
-
Save jeffjirsa/7a45af6086899ed9cf4ce4d4a37a5fa5 to your computer and use it in GitHub Desktop.
Faster StreamingHistogram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 014e09019b14f404898cd9c16384ee94ac293dab Mon Sep 17 00:00:00 2001 | |
From: Jeff Jirsa <jeff@jeffjirsa.net> | |
Date: Sun, 8 Jan 2017 23:21:03 -0800 | |
Subject: [PATCH] POC: Faster StreamingHistogram | |
--- | |
.../org/apache/cassandra/io/sstable/SSTable.java | 3 +- | |
.../io/sstable/metadata/MetadataCollector.java | 2 +- | |
.../apache/cassandra/utils/StreamingHistogram.java | 127 ++++++--- | |
.../test/microbench/StreamingHistogramBench.java | 307 +++++++++++++++++++++ | |
.../cassandra/utils/StreamingHistogramTest.java | 32 ++- | |
5 files changed, 433 insertions(+), 38 deletions(-) | |
create mode 100644 test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java | |
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java | |
index 8556cfa..9c249da 100644 | |
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java | |
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java | |
@@ -59,7 +59,8 @@ public abstract class SSTable | |
{ | |
static final Logger logger = LoggerFactory.getLogger(SSTable.class); | |
- public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100; | |
+ public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = Integer.valueOf(System.getProperty("cassandra.streaminghistogram.bins", "100")); | |
+ public static final int TOMBSTONE_HISTOGRAM_SPOOL_SIZE = Integer.valueOf(System.getProperty("cassandra.streaminghistogram.spool", "100000")); | |
public final Descriptor descriptor; | |
protected final Set<Component> components; | |
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java | |
index 3b32ae2..683ed4e 100644 | |
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java | |
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java | |
@@ -58,7 +58,7 @@ public class MetadataCollector implements PartitionStatisticsCollector | |
static StreamingHistogram defaultTombstoneDropTimeHistogram() | |
{ | |
- return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); | |
+ return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE); | |
} | |
public static StatsMetadata defaultStatsMetadata() | |
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java | |
index a500450..5d73bca 100644 | |
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java | |
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java | |
@@ -44,27 +44,42 @@ public class StreamingHistogram | |
// Serialized Histograms always writes with double keys for backwards compatibility | |
private final TreeMap<Number, long[]> bin; | |
+ // Keep a second, larger buffer to spool data in, before finalizing it into `bin` | |
+ private final TreeMap<Number, long[]> spool; | |
+ | |
+ | |
// maximum bin size for this histogram | |
private final int maxBinSize; | |
+ // maximum size of the spool | |
+ private final int maxSpoolSize; | |
+ | |
/** | |
* Creates a new histogram with max bin size of maxBinSize | |
* @param maxBinSize maximum number of bins this histogram can have | |
*/ | |
- public StreamingHistogram(int maxBinSize) | |
+ public StreamingHistogram(int maxBinSize, int maxSpoolSize) | |
{ | |
this.maxBinSize = maxBinSize; | |
+ this.maxSpoolSize = maxSpoolSize; | |
bin = new TreeMap<>((o1, o2) -> { | |
if (o1.getClass().equals(o2.getClass())) | |
return ((Comparable)o1).compareTo(o2); | |
else | |
return Double.compare(o1.doubleValue(), o2.doubleValue()); | |
}); | |
+ spool = new TreeMap<>((o1, o2) -> { | |
+ if (o1.getClass().equals(o2.getClass())) | |
+ return ((Comparable)o1).compareTo(o2); | |
+ else | |
+ return Double.compare(o1.doubleValue(), o2.doubleValue()); | |
+ }); | |
+ | |
} | |
- private StreamingHistogram(int maxBinSize, Map<Double, Long> bin) | |
+ private StreamingHistogram(int maxBinSize, int maxSpoolSize, Map<Double, Long> bin) | |
{ | |
- this(maxBinSize); | |
+ this(maxBinSize, maxSpoolSize); | |
for (Map.Entry<Double, Long> entry : bin.entrySet()) | |
this.bin.put(entry.getKey(), new long[]{entry.getValue()}); | |
} | |
@@ -85,7 +100,7 @@ public class StreamingHistogram | |
*/ | |
public void update(Number p, long m) | |
{ | |
- long[] mi = bin.get(p); | |
+ long[] mi = spool.get(p); | |
if (mi != null) | |
{ | |
// we found the same p so increment that counter | |
@@ -94,38 +109,79 @@ public class StreamingHistogram | |
else | |
{ | |
mi = new long[]{m}; | |
- bin.put(p, mi); | |
- // if bin size exceeds maximum bin size then trim down to max size | |
- while (bin.size() > maxBinSize) | |
+ spool.put(p, mi); | |
+ } | |
+ | |
+ // If spool has overflowed, compact it | |
+ if(spool.size() > maxSpoolSize) | |
+ finalize(); | |
+ } | |
+ | |
+ /** | |
+ * Drain the temporary spool into the final bins | |
+ */ | |
+ public void finalize() | |
+ { | |
+ if (spool.size() > 0) | |
+ { | |
+ long[] spoolValue; | |
+ long[] binValue; | |
+ | |
+ // Iterate over the spool, copying the value into the primary bin map | |
+ // and compacting that map as necessary | |
+ // The cost of compacting `bins` for a spool of M entries and a bin of size N is M * N | |
+ for (Map.Entry<Number, long[]> entry : spool.entrySet()) | |
{ | |
- // find points p1, p2 which have smallest difference | |
- Iterator<Number> keys = bin.keySet().iterator(); | |
- double p1 = keys.next().doubleValue(); | |
- double p2 = keys.next().doubleValue(); | |
- double smallestDiff = p2 - p1; | |
- double q1 = p1, q2 = p2; | |
- while (keys.hasNext()) | |
+ Number key = entry.getKey(); | |
+ spoolValue = entry.getValue(); | |
+ binValue = bin.get(key); | |
+ | |
+ // If this value is already in the final histogram bins | |
+ // Simply increment and update, otherwise, insert a new long[1] value | |
+ if(binValue != null) | |
{ | |
- p1 = p2; | |
- p2 = keys.next().doubleValue(); | |
- double diff = p2 - p1; | |
- if (diff < smallestDiff) | |
+ binValue[0] += spoolValue[0]; | |
+ bin.put(key, binValue); | |
+ } | |
+ else | |
+ { | |
+ bin.put(key, new long[]{spoolValue[0]}); | |
+ } | |
+ | |
+ if (bin.size() > maxBinSize) | |
+ { | |
+ // find points p1, p2 which have smallest difference | |
+ Iterator<Number> keys = bin.keySet().iterator(); | |
+ double p1 = keys.next().doubleValue(); | |
+ double p2 = keys.next().doubleValue(); | |
+ double smallestDiff = p2 - p1; | |
+ double q1 = p1, q2 = p2; | |
+ while (keys.hasNext()) | |
{ | |
- smallestDiff = diff; | |
- q1 = p1; | |
- q2 = p2; | |
+ p1 = p2; | |
+ p2 = keys.next().doubleValue(); | |
+ double diff = p2 - p1; | |
+ if (diff < smallestDiff) | |
+ { | |
+ smallestDiff = diff; | |
+ q1 = p1; | |
+ q2 = p2; | |
+ } | |
} | |
+ // merge those two | |
+ long[] a1 = bin.remove(q1); | |
+ long[] a2 = bin.remove(q2); | |
+ long k1 = a1[0]; | |
+ long k2 = a2[0]; | |
+ | |
+ a1[0] += k2; | |
+ bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1); | |
+ | |
} | |
- // merge those two | |
- long[] a1 = bin.remove(q1); | |
- long[] a2 = bin.remove(q2); | |
- long k1 = a1[0]; | |
- long k2 = a2[0]; | |
- | |
- a1[0] += k2; | |
- bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1); | |
} | |
} | |
+ | |
+ spool.clear(); | |
} | |
/** | |
@@ -138,6 +194,8 @@ public class StreamingHistogram | |
if (other == null) | |
return; | |
+ other.finalize(); | |
+ | |
for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet()) | |
update(entry.getKey(), entry.getValue()[0]); | |
} | |
@@ -150,6 +208,7 @@ public class StreamingHistogram | |
*/ | |
public double sum(double b) | |
{ | |
+ finalize(); | |
double sum = 0; | |
// find the points pi, pnext which satisfy pi <= b < pnext | |
Map.Entry<Number, long[]> pnext = bin.higherEntry(b); | |
@@ -186,6 +245,7 @@ public class StreamingHistogram | |
{ | |
public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException | |
{ | |
+ histogram.finalize(); | |
out.writeInt(histogram.maxBinSize); | |
Map<Number, long[]> entries = histogram.getAsMap(); | |
out.writeInt(entries.size()); | |
@@ -206,7 +266,7 @@ public class StreamingHistogram | |
tmp.put(in.readDouble(), in.readLong()); | |
} | |
- return new StreamingHistogram(maxBinSize, tmp); | |
+ return new StreamingHistogram(maxBinSize, maxBinSize, tmp); | |
} | |
public long serializedSize(StreamingHistogram histogram) | |
@@ -230,13 +290,16 @@ public class StreamingHistogram | |
return false; | |
StreamingHistogram that = (StreamingHistogram) o; | |
- return maxBinSize == that.maxBinSize && bin.equals(that.bin); | |
+ return maxBinSize == that.maxBinSize && | |
+ maxSpoolSize == that.maxSpoolSize && | |
+ spool.equals(that.spool) && | |
+ bin.equals(that.bin); | |
} | |
@Override | |
public int hashCode() | |
{ | |
- return Objects.hashCode(bin.hashCode(), maxBinSize); | |
+ return Objects.hashCode(bin.hashCode(), maxBinSize, maxSpoolSize); | |
} | |
} | |
diff --git a/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java b/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java | |
new file mode 100644 | |
index 0000000..496d483 | |
--- /dev/null | |
+++ b/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java | |
@@ -0,0 +1,307 @@ | |
+/* | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+ | |
+package org.apache.cassandra.test.microbench; | |
+ | |
+ | |
+import java.io.IOException; | |
+import java.util.concurrent.ExecutionException; | |
+import java.util.concurrent.TimeUnit; | |
+import java.util.Random; | |
+ | |
+import org.apache.cassandra.utils.StreamingHistogram; | |
+import org.openjdk.jmh.annotations.*; | |
+ | |
+@BenchmarkMode(Mode.AverageTime) | |
+@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) | |
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) | |
+@Fork(value = 1) | |
+@Threads(1) | |
+@State(Scope.Benchmark) | |
+public class StreamingHistogramBench | |
+{ | |
+ | |
+ StreamingHistogram streamingHistogram; | |
+ StreamingHistogram newStreamingHistogram; | |
+ StreamingHistogram newStreamingHistogram2; | |
+ StreamingHistogram newStreamingHistogram3; | |
+ StreamingHistogram newStreamingHistogram4; | |
+ StreamingHistogram newStreamingHistogram5; | |
+ StreamingHistogram newStreamingHistogram6; | |
+ | |
+ StreamingHistogram narrowstreamingHistogram; | |
+ StreamingHistogram narrownewStreamingHistogram; | |
+ StreamingHistogram narrownewStreamingHistogram2; | |
+ StreamingHistogram narrownewStreamingHistogram3; | |
+ StreamingHistogram narrownewStreamingHistogram4; | |
+ StreamingHistogram narrownewStreamingHistogram5; | |
+ StreamingHistogram narrownewStreamingHistogram6; | |
+ | |
+ StreamingHistogram sparsestreamingHistogram; | |
+ StreamingHistogram sparsenewStreamingHistogram; | |
+ StreamingHistogram sparsenewStreamingHistogram2; | |
+ StreamingHistogram sparsenewStreamingHistogram3; | |
+ StreamingHistogram sparsenewStreamingHistogram4; | |
+ StreamingHistogram sparsenewStreamingHistogram5; | |
+ StreamingHistogram sparsenewStreamingHistogram6; | |
+ | |
+ | |
+ | |
+ static int[] ttls = new int[10000000]; | |
+ static int[] narrowttls = new int[10000000]; | |
+ static int[] sparsettls = new int[10000000]; | |
+ static | |
+ { | |
+ Random random = new Random(); | |
+ for(int i = 0 ; i < 10000000; i++) | |
+ { | |
+ // Seconds in a day | |
+ ttls[i] = random.nextInt(86400); | |
+ // Seconds in 3 hours | |
+ narrowttls[i] = random.nextInt(14400); | |
+ // Seconds in a minute | |
+ sparsettls[i] = random.nextInt(60); | |
+ } | |
+ } | |
+ | |
+ @Setup(Level.Trial) | |
+ public void setup() throws Throwable | |
+ { | |
+ | |
+ streamingHistogram = new StreamingHistogram(100, 0); | |
+ newStreamingHistogram = new StreamingHistogram(100, 1000); | |
+ newStreamingHistogram2 = new StreamingHistogram(100, 10000); | |
+ newStreamingHistogram3 = new StreamingHistogram(100, 100000); | |
+ newStreamingHistogram4 = new StreamingHistogram(50, 100000); | |
+ newStreamingHistogram5 = new StreamingHistogram(50, 10000); | |
+ newStreamingHistogram6 = new StreamingHistogram(100, 1000000); | |
+ | |
+ narrowstreamingHistogram = new StreamingHistogram(100, 0); | |
+ narrownewStreamingHistogram = new StreamingHistogram(100, 1000); | |
+ narrownewStreamingHistogram2 = new StreamingHistogram(100, 10000); | |
+ narrownewStreamingHistogram3 = new StreamingHistogram(100, 100000); | |
+ narrownewStreamingHistogram4 = new StreamingHistogram(50, 100000); | |
+ narrownewStreamingHistogram5 = new StreamingHistogram(50, 10000); | |
+ narrownewStreamingHistogram6 = new StreamingHistogram(100, 1000000); | |
+ | |
+ sparsestreamingHistogram = new StreamingHistogram(100, 0); | |
+ sparsenewStreamingHistogram = new StreamingHistogram(100, 1000); | |
+ sparsenewStreamingHistogram2 = new StreamingHistogram(100, 10000); | |
+ sparsenewStreamingHistogram3 = new StreamingHistogram(100, 100000); | |
+ sparsenewStreamingHistogram4 = new StreamingHistogram(50, 100000); | |
+ sparsenewStreamingHistogram5 = new StreamingHistogram(50, 10000); | |
+ sparsenewStreamingHistogram6 = new StreamingHistogram(100, 1000000); | |
+ | |
+ } | |
+ | |
+ @TearDown(Level.Trial) | |
+ public void teardown() throws IOException, ExecutionException, InterruptedException | |
+ { | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void exitingSH() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ streamingHistogram.update(ttls[i]); | |
+ streamingHistogram.finalize(); | |
+ } | |
+ | |
+ @Benchmark | |
+ public void newSH10x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram.update(ttls[i]); | |
+ newStreamingHistogram.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void newSH100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram2.update(ttls[i]); | |
+ newStreamingHistogram2.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void newSH1000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram3.update(ttls[i]); | |
+ newStreamingHistogram3.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void newSH10000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram6.update(ttls[i]); | |
+ newStreamingHistogram6.finalize(); | |
+ | |
+ } | |
+ | |
+ | |
+ @Benchmark | |
+ public void newSH50and1000() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram4.update(ttls[i]); | |
+ newStreamingHistogram4.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void newSH50and100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ newStreamingHistogram5.update(ttls[i]); | |
+ newStreamingHistogram5.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrowexitingSH() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrowstreamingHistogram.update(narrowttls[i]); | |
+ narrowstreamingHistogram.finalize(); | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrownewSH10x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram.update(narrowttls[i]); | |
+ narrownewStreamingHistogram.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrownewSH100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram2.update(narrowttls[i]); | |
+ narrownewStreamingHistogram2.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrownewSH1000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram3.update(narrowttls[i]); | |
+ narrownewStreamingHistogram3.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrownewSH10000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram6.update(ttls[i]); | |
+ narrownewStreamingHistogram6.finalize(); | |
+ | |
+ } | |
+ | |
+ | |
+ @Benchmark | |
+ public void narrownewSH50and1000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram4.update(narrowttls[i]); | |
+ narrownewStreamingHistogram4.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void narrownewSH50and100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ narrownewStreamingHistogram5.update(narrowttls[i]); | |
+ narrownewStreamingHistogram5.finalize(); | |
+ | |
+ } | |
+ | |
+ | |
+ @Benchmark | |
+ public void sparseexitingSH() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsestreamingHistogram.update(sparsettls[i]); | |
+ sparsestreamingHistogram.finalize(); | |
+ } | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH10x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram.update(sparsettls[i]); | |
+ sparsenewStreamingHistogram.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram2.update(sparsettls[i]); | |
+ sparsenewStreamingHistogram2.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH1000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram3.update(sparsettls[i]); | |
+ sparsenewStreamingHistogram3.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH10000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram6.update(ttls[i]); | |
+ sparsenewStreamingHistogram6.finalize(); | |
+ } | |
+ | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH50and1000x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram4.update(sparsettls[i]); | |
+ sparsenewStreamingHistogram4.finalize(); | |
+ | |
+ } | |
+ | |
+ @Benchmark | |
+ public void sparsenewSH50and100x() throws Throwable | |
+ { | |
+ for(int i = 0 ; i < ttls.length; i++) | |
+ sparsenewStreamingHistogram5.update(sparsettls[i]); | |
+ sparsenewStreamingHistogram5.finalize(); | |
+ | |
+ } | |
+} | |
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java | |
index 94aac9e..1765000 100644 | |
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java | |
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java | |
@@ -33,7 +33,7 @@ public class StreamingHistogramTest | |
@Test | |
public void testFunction() throws Exception | |
{ | |
- StreamingHistogram hist = new StreamingHistogram(5); | |
+ StreamingHistogram hist = new StreamingHistogram(5, 0); | |
long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; | |
// add 7 points to histogram of 5 bins | |
@@ -59,7 +59,7 @@ public class StreamingHistogramTest | |
} | |
// merge test | |
- StreamingHistogram hist2 = new StreamingHistogram(3); | |
+ StreamingHistogram hist2 = new StreamingHistogram(3, 3); | |
for (int i = 7; i < samples.length; i++) | |
{ | |
hist2.update(samples[i]); | |
@@ -89,7 +89,7 @@ public class StreamingHistogramTest | |
@Test | |
public void testSerDe() throws Exception | |
{ | |
- StreamingHistogram hist = new StreamingHistogram(5); | |
+ StreamingHistogram hist = new StreamingHistogram(5, 0); | |
long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9}; | |
// add 7 points to histogram of 5 bins | |
@@ -125,7 +125,7 @@ public class StreamingHistogramTest | |
@Test | |
public void testNumericTypes() throws Exception | |
{ | |
- StreamingHistogram hist = new StreamingHistogram(5); | |
+ StreamingHistogram hist = new StreamingHistogram(5, 0); | |
hist.update(2); | |
hist.update(2.0); | |
@@ -144,9 +144,33 @@ public class StreamingHistogramTest | |
StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes)); | |
deserialized.update(2L); | |
+ deserialized.finalize(); | |
asMap = deserialized.getAsMap(); | |
assertEquals(1, asMap.size()); | |
assertEquals(4L, asMap.get(2)[0]); | |
} | |
+ | |
+ @Test | |
+ public void testOverflow() throws Exception | |
+ { | |
+ StreamingHistogram hist = new StreamingHistogram(5, 10); | |
+ long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31, | |
+ 32, 32, 33, 34, 35, 70, 78, 80, 90, 100, | |
+ 32, 32, 33, 34, 35, 70, 78, 80, 90, 100 | |
+ }; | |
+ | |
+ for (int i = 0; i < 9; i++) | |
+ { | |
+ hist.update(samples[i]); | |
+ } | |
+ assertEquals(0, hist.getAsMap().keySet().size()); | |
+ | |
+ // Hit the spool cap, force it to make bins | |
+ hist.update(samples[9]); | |
+ hist.update(samples[10]); | |
+ | |
+ assertEquals(5, hist.getAsMap().keySet().size()); | |
+ | |
+ } | |
} | |
-- | |
2.5.4 (Apple Git-61) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[java] # Run complete. Total time: 00:13:46 | |
[java] | |
[java] Benchmark Mode Cnt Score Error Units | |
[java] StreamingHistogramBench.exitingSH avgt 5 9511.690 ± 47.037 ms/op | |
[java] StreamingHistogramBench.narrowexitingSH avgt 5 9216.248 ± 122.733 ms/op | |
[java] StreamingHistogramBench.sparseexitingSH avgt 5 738.502 ± 46.968 ms/op | |
[java] StreamingHistogramBench.newSH10x avgt 5 9504.340 ± 693.669 ms/op | |
[java] StreamingHistogramBench.narrownewSH10x avgt 5 8853.729 ± 48.531 ms/op | |
[java] StreamingHistogramBench.sparsenewSH10x avgt 5 417.739 ± 113.570 ms/op | |
[java] StreamingHistogramBench.newSH100x avgt 5 12058.853 ± 219.759 ms/op | |
[java] StreamingHistogramBench.narrownewSH100x avgt 5 7493.955 ± 97.979 ms/op | |
[java] StreamingHistogramBench.sparsenewSH100x avgt 5 417.045 ± 17.856 ms/op | |
[java] StreamingHistogramBench.newSH1000x avgt 5 2919.360 ± 443.184 ms/op | |
[java] StreamingHistogramBench.narrownewSH1000x avgt 5 1546.174 ± 204.283 ms/op | |
[java] StreamingHistogramBench.sparsenewSH1000x avgt 5 428.587 ± 53.427 ms/op | |
[java] StreamingHistogramBench.newSH10000x avgt 5 2988.654 ± 1041.131 ms/op | |
[java] StreamingHistogramBench.narrownewSH10000x avgt 5 2952.509 ± 1153.165 ms/op | |
[java] StreamingHistogramBench.sparsenewSH10000x avgt 5 2734.421 ± 863.051 ms/op | |
[java] StreamingHistogramBench.newSH50and100x avgt 5 7801.502 ± 1136.165 ms/op | |
[java] StreamingHistogramBench.narrownewSH50and100x avgt 5 5255.424 ± 577.289 ms/op | |
[java] StreamingHistogramBench.sparsenewSH50and100x avgt 5 452.282 ± 86.895 ms/op | |
[java] StreamingHistogramBench.newSH50and1000x avgt 5 2869.132 ± 908.519 ms/op | |
[java] StreamingHistogramBench.narrownewSH50and1000x avgt 5 1493.804 ± 110.359 ms/op | |
[java] StreamingHistogramBench.sparsenewSH50and1000x avgt 5 469.177 ± 23.305 ms/op |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment