Last active
September 6, 2017 01:01
-
-
Save paul-brebner/bce53b77a7b53e45cfac2f4d4f9c4b6d to your computer and use it in GitHub Desktop.
Regression Analysis example code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test1; | |
import java.time.Instant; | |
import java.time.temporal.ChronoUnit; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import com.datastax.driver.core.*; | |
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; | |
/* | |
* Simple experiment to use real sandbox Instametrics data cluster remotely | |
* Example is Regression Analysis on Heap use metrics. | |
*/ | |
public class CassTest2 { | |
// One Public IP address for the Cassandra cluster | |
static String n1PubIP = "01.2.34.56"; | |
static String dcName = "instametrics_sandbox"; // this is the DC name you used when created | |
static String user = "your user name"; | |
static String password = "password"; | |
static double[] timesArray; | |
static double[] valuesArray; | |
static double[] objectToDouble(ArrayList<Double> a) | |
{ | |
double[] darray = new double[a.size()]; | |
for (int i=0; i < darray.length; i++) | |
darray[i] = (double)a.get(i); | |
return darray; | |
} | |
// convert times to relative time in seconds from 1st time and return array | |
// Note that after calling this the rs is empty!!! | |
// uses side effect and stores result in timesArray and valuesArray | |
static void relativeTime(ResultSet rs) | |
{ | |
ArrayList<Double> a = new ArrayList<Double>(); | |
ArrayList<Double> b = new ArrayList<Double>(); | |
Instant firstTime = Instant.EPOCH; | |
int j = 0; | |
for (Row row : rs) | |
{ | |
Date d = row.getTimestamp("time"); | |
Instant i = d.toInstant(); | |
if (j == 0) | |
firstTime = i; | |
// relative time calc | |
long seconds = ChronoUnit.SECONDS.between(firstTime, i); | |
a.add((double)seconds); | |
b.add(row.getDouble("metric")); | |
j++; | |
} | |
timesArray = objectToDouble(a); | |
valuesArray = objectToDouble(b); | |
} | |
static void printInstantInterval(Instant firstDate, Instant lastDate) | |
{ | |
long seconds = ChronoUnit.SECONDS.between(firstDate, lastDate); | |
long minutes = ChronoUnit.MINUTES.between(firstDate, lastDate); | |
long hours = ChronoUnit.HOURS.between(firstDate, lastDate); | |
System.out.println("Interval from " + firstDate + " to " + lastDate + " seconds " + seconds + " minutes " + minutes + " hours " + hours); | |
} | |
// use collection counts to find GC events rather than durations | |
// return arrayList of sample start and end times as Instants | |
static ArrayList<Sample> findNextGCCount(ResultSet rs) | |
{ | |
ArrayList<Sample> samples = new ArrayList<Sample>(); | |
long prevd = 0; | |
// range for sampling for gc event | |
Date sampleStartTime = new Date(); | |
Date sampleEndTime = new Date(); | |
Date firstDate = new Date(); | |
Date lastDate = new Date(); | |
int i = 0; | |
long gcCount = 0; | |
long gcTotalTime = 0; | |
for (Row aRow : rs) | |
{ | |
long countNow = (long)aRow.getDouble("metric"); | |
Date time = aRow.getTimestamp("time"); | |
if (i == 0) // first row | |
{ | |
prevd=countNow; | |
sampleEndTime = time; | |
sampleStartTime = time; | |
firstDate = time; | |
} | |
if (countNow > prevd) | |
{ | |
gcCount += (countNow - prevd); | |
// ignore multiple GCs in a row as implies missing data. | |
if ((countNow - prevd) == 1) | |
{ | |
System.out.println("New GC! " + countNow + " sample start=" + sampleStartTime.toInstant() + " sample end=" + sampleEndTime.toInstant()); | |
Sample sample = new Sample(sampleStartTime.toInstant(), sampleEndTime.toInstant()); | |
samples.add(sample); | |
} | |
prevd = countNow; // one or more gcs found so reset | |
sampleStartTime = time; // new sample start time is the timestamp of the newly found gc duration | |
} | |
sampleEndTime = time; // always increment end time | |
lastDate = time; | |
i++; | |
} | |
long seconds = ChronoUnit.SECONDS.between(firstDate.toInstant(), lastDate.toInstant()); | |
long minutes = ChronoUnit.MINUTES.between(firstDate.toInstant(), lastDate.toInstant()); | |
long hours = ChronoUnit.HOURS.between(firstDate.toInstant(), lastDate.toInstant()); | |
Double freqh = (double)gcCount/hours; | |
System.out.println("DEBUG: last time = " + lastDate); | |
System.out.println("gcCount = " + gcCount + " totalGCDuration=" + gcTotalTime + " time range= " + seconds + " gc frequency/h = " + freqh); | |
return samples; | |
} | |
// summary regression stats | |
static double errorSum = 0; | |
static double errorCount = 0; | |
static double gcSum = 0; | |
static double totalCount = 0; | |
public static void regression(double[] x, double[] y) | |
{ | |
int n = 0; | |
int sn = x.length/2; // use only a sample (first 1/2) of the data to emulate real use where a subset of the data is used and prediction made. | |
System.out.println("Linear regression array size " + x.length + " " + y.length + " sampling size " + sn); | |
if (x.length != y.length) | |
{ | |
System.out.println("Error arrays unequal size."); | |
return; | |
} | |
double sumx = 0.0, sumy = 0.0, sumx2 = 0.0; | |
for (int i = 0; i < sn; i++) | |
{ | |
sumx += x[n]; | |
sumx2 += x[n] * x[n]; | |
sumy += y[n]; | |
n++; | |
} | |
double xbar = sumx / n; | |
double ybar = sumy / n; | |
double xxbar = 0.0, yybar = 0.0, xybar = 0.0; | |
for (int i = 0; i < n; i++) { | |
xxbar += (x[i] - xbar) * (x[i] - xbar); | |
yybar += (y[i] - ybar) * (y[i] - ybar); | |
xybar += (x[i] - xbar) * (y[i] - ybar); | |
} | |
double beta1 = xybar / xxbar; | |
double beta0 = ybar - beta1 * xbar; | |
// print function | |
System.out.println("y = " + beta1 + " * x + " + beta0); | |
int df = n - 2; | |
double rss = 0.0; // residual sum of squares | |
double ssr = 0.0; // regression sum of squares | |
for (int i = 0; i < n; i++) { | |
double fit = beta1*x[i] + beta0; | |
rss += (fit - y[i]) * (fit - y[i]); | |
ssr += (fit - ybar) * (fit - ybar); | |
} | |
double R2 = ssr / yybar; | |
double svar = rss / df; | |
double svar1 = svar / xxbar; | |
double svar0 = svar/n + xbar*xbar*svar1; | |
System.out.println("R^2 = " + R2); | |
svar0 = svar * sumx2 / (n * xxbar); | |
double stderr = Math.sqrt(yybar/n); | |
// Compute prediction error for time. Function for x value to check difference in time (y) between prediction and measured. | |
// use last value in array. | |
double yactual = y[y.length-1]; | |
double xpredicted = (yactual - beta0) / beta1; | |
double xactual = x[x.length-1]; | |
double diff = Math.abs(xactual-xpredicted); | |
double perErr = (diff/xactual)*100; | |
totalCount++; | |
// keep track of errors if R2 > 0.5 | |
if (R2 > 0.5) | |
{ | |
errorCount++; | |
if (errorCount > 1) | |
{ | |
errorSum += perErr; | |
gcSum += xactual; | |
} | |
else | |
{ | |
errorSum = perErr; | |
gcSum = xactual; | |
} | |
} | |
double errorAvg = 0; | |
double gcAvg = 0; | |
if (errorCount > 0) | |
{ | |
errorAvg = errorSum/errorCount; | |
gcAvg = gcSum/errorCount; | |
} | |
System.out.println("Time of actual GC " + xactual + " predicted time = " + xpredicted + " diff = " + diff + " Perc Error " + perErr + " avgPerErr = " + errorAvg + " gcAvg = " + gcAvg + " counted " + errorCount + " out of " + totalCount); | |
} | |
public static void main(String[] args) { | |
long t1 = 0; // time each CQL operation, t1 is start time t2 is end time, time is t2-t1 | |
long t2 = 0; | |
long time = 0; | |
t1 = System.currentTimeMillis(); | |
/* try different consistency levels | |
* ONE, TWO, THREE, QUORUM, LOCAL_QUORUM, ALL | |
*/ | |
QueryOptions qos = new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM); | |
Cluster.Builder clusterBuilder = Cluster.builder() | |
.addContactPoints( | |
n1PubIP // provide one public IP | |
) | |
.withQueryOptions(qos) | |
// can't use load balancing with only a single public IP address (with private IP broadcasting turned on for the cluster) | |
//.withLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(dcName).build()) // your local data centre | |
.withPort(9042); | |
// .withAuthProvider(new PlainTextAuthProvider(user, password)); | |
Cluster cluster = null; | |
try { | |
cluster = clusterBuilder.build(); | |
Metadata metadata = cluster.getMetadata(); | |
System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); | |
for (Host host: metadata.getAllHosts()) { | |
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); | |
} | |
Session session = cluster.connect(); | |
ResultSet rs; | |
int useHosts = 10; // max hosts to sample | |
int usedHosts = 0; | |
ResultSet rsHostNames = session.execute("select host from instametrics.host"); | |
for (Row hostN : rsHostNames) | |
{ // next host | |
usedHosts++; | |
if (usedHosts > useHosts) | |
break; | |
System.out.println("PROCESSING HOST NUMBER " + usedHosts + " " + hostN.toString()); | |
String hostName = hostN.getString("host"); | |
rs = session.execute("select * from instametrics.host_service_time2 where host='" + hostName + "' and service='/cassandra/jvm/gc/ConcurrentMarkSweep/collectionCount'"); | |
ArrayList<Sample> samples = findNextGCCount(rs); | |
for (Sample sample: samples) | |
{ | |
Instant i1 = sample.start; | |
Instant i2 = sample.end; | |
long l1 = i1.toEpochMilli(); | |
long l2 = i2.toEpochMilli(); | |
ResultSet rs2 = session.execute("select time, metric from instametrics.host_service_time2 where host='" + hostName + "' and service='/cassandra/jvm/memory/heapMemoryUsage/used' and time>" + l1 + " and time<" + l2); | |
System.out.println("heap mem samples = " + rs2.getAvailableWithoutFetching() + " from " + i1 + " to " + i2); | |
printInstantInterval(i1, i2); | |
// what's min sample size to use? 10? | |
if (rs2.getAvailableWithoutFetching() >= 10) | |
{ | |
relativeTime(rs2); | |
regression(timesArray, valuesArray); | |
} | |
} | |
} | |
t2 = System.currentTimeMillis(); | |
time = t2 - t1; | |
System.out.println("DONE! Total time = " + time); | |
} | |
finally { | |
if (cluster != null) cluster.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment