Skip to content

Instantly share code, notes, and snippets.

@paul-brebner
Last active September 6, 2017 01:01
Show Gist options
  • Save paul-brebner/bce53b77a7b53e45cfac2f4d4f9c4b6d to your computer and use it in GitHub Desktop.
Save paul-brebner/bce53b77a7b53e45cfac2f4d4f9c4b6d to your computer and use it in GitHub Desktop.
Regression Analysis example code
package test1;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
/*
* Simple experiment to use real sandbox Instametrics data cluster remotely
* Example is Regression Analysis on Heap use metrics.
*/
public class CassTest2 {
// One Public IP address for the Cassandra cluster
static String n1PubIP = "01.2.34.56";
static String dcName = "instametrics_sandbox"; // this is the DC name you used when created
static String user = "your user name";
static String password = "password";
static double[] timesArray;
static double[] valuesArray;
static double[] objectToDouble(ArrayList<Double> a)
{
double[] darray = new double[a.size()];
for (int i=0; i < darray.length; i++)
darray[i] = (double)a.get(i);
return darray;
}
// convert times to relative time in seconds from 1st time and return array
// Note that after calling this the rs is empty!!!
// uses side effect and stores result in timesArray and valuesArray
static void relativeTime(ResultSet rs)
{
ArrayList<Double> a = new ArrayList<Double>();
ArrayList<Double> b = new ArrayList<Double>();
Instant firstTime = Instant.EPOCH;
int j = 0;
for (Row row : rs)
{
Date d = row.getTimestamp("time");
Instant i = d.toInstant();
if (j == 0)
firstTime = i;
// relative time calc
long seconds = ChronoUnit.SECONDS.between(firstTime, i);
a.add((double)seconds);
b.add(row.getDouble("metric"));
j++;
}
timesArray = objectToDouble(a);
valuesArray = objectToDouble(b);
}
static void printInstantInterval(Instant firstDate, Instant lastDate)
{
long seconds = ChronoUnit.SECONDS.between(firstDate, lastDate);
long minutes = ChronoUnit.MINUTES.between(firstDate, lastDate);
long hours = ChronoUnit.HOURS.between(firstDate, lastDate);
System.out.println("Interval from " + firstDate + " to " + lastDate + " seconds " + seconds + " minutes " + minutes + " hours " + hours);
}
// use collection counts to find GC events rather than durations
// return arrayList of sample start and end times as Instants
static ArrayList<Sample> findNextGCCount(ResultSet rs)
{
ArrayList<Sample> samples = new ArrayList<Sample>();
long prevd = 0;
// range for sampling for gc event
Date sampleStartTime = new Date();
Date sampleEndTime = new Date();
Date firstDate = new Date();
Date lastDate = new Date();
int i = 0;
long gcCount = 0;
long gcTotalTime = 0;
for (Row aRow : rs)
{
long countNow = (long)aRow.getDouble("metric");
Date time = aRow.getTimestamp("time");
if (i == 0) // first row
{
prevd=countNow;
sampleEndTime = time;
sampleStartTime = time;
firstDate = time;
}
if (countNow > prevd)
{
gcCount += (countNow - prevd);
// ignore multiple GCs in a row as implies missing data.
if ((countNow - prevd) == 1)
{
System.out.println("New GC! " + countNow + " sample start=" + sampleStartTime.toInstant() + " sample end=" + sampleEndTime.toInstant());
Sample sample = new Sample(sampleStartTime.toInstant(), sampleEndTime.toInstant());
samples.add(sample);
}
prevd = countNow; // one or more gcs found so reset
sampleStartTime = time; // new sample start time is the timestamp of the newly found gc duration
}
sampleEndTime = time; // always increment end time
lastDate = time;
i++;
}
long seconds = ChronoUnit.SECONDS.between(firstDate.toInstant(), lastDate.toInstant());
long minutes = ChronoUnit.MINUTES.between(firstDate.toInstant(), lastDate.toInstant());
long hours = ChronoUnit.HOURS.between(firstDate.toInstant(), lastDate.toInstant());
Double freqh = (double)gcCount/hours;
System.out.println("DEBUG: last time = " + lastDate);
System.out.println("gcCount = " + gcCount + " totalGCDuration=" + gcTotalTime + " time range= " + seconds + " gc frequency/h = " + freqh);
return samples;
}
// summary regression stats
static double errorSum = 0;
static double errorCount = 0;
static double gcSum = 0;
static double totalCount = 0;
public static void regression(double[] x, double[] y)
{
int n = 0;
int sn = x.length/2; // use only a sample (first 1/2) of the data to emulate real use where a subset of the data is used and prediction made.
System.out.println("Linear regression array size " + x.length + " " + y.length + " sampling size " + sn);
if (x.length != y.length)
{
System.out.println("Error arrays unequal size.");
return;
}
double sumx = 0.0, sumy = 0.0, sumx2 = 0.0;
for (int i = 0; i < sn; i++)
{
sumx += x[n];
sumx2 += x[n] * x[n];
sumy += y[n];
n++;
}
double xbar = sumx / n;
double ybar = sumy / n;
double xxbar = 0.0, yybar = 0.0, xybar = 0.0;
for (int i = 0; i < n; i++) {
xxbar += (x[i] - xbar) * (x[i] - xbar);
yybar += (y[i] - ybar) * (y[i] - ybar);
xybar += (x[i] - xbar) * (y[i] - ybar);
}
double beta1 = xybar / xxbar;
double beta0 = ybar - beta1 * xbar;
// print function
System.out.println("y = " + beta1 + " * x + " + beta0);
int df = n - 2;
double rss = 0.0; // residual sum of squares
double ssr = 0.0; // regression sum of squares
for (int i = 0; i < n; i++) {
double fit = beta1*x[i] + beta0;
rss += (fit - y[i]) * (fit - y[i]);
ssr += (fit - ybar) * (fit - ybar);
}
double R2 = ssr / yybar;
double svar = rss / df;
double svar1 = svar / xxbar;
double svar0 = svar/n + xbar*xbar*svar1;
System.out.println("R^2 = " + R2);
svar0 = svar * sumx2 / (n * xxbar);
double stderr = Math.sqrt(yybar/n);
// Compute prediction error for time. Function for x value to check difference in time (y) between prediction and measured.
// use last value in array.
double yactual = y[y.length-1];
double xpredicted = (yactual - beta0) / beta1;
double xactual = x[x.length-1];
double diff = Math.abs(xactual-xpredicted);
double perErr = (diff/xactual)*100;
totalCount++;
// keep track of errors if R2 > 0.5
if (R2 > 0.5)
{
errorCount++;
if (errorCount > 1)
{
errorSum += perErr;
gcSum += xactual;
}
else
{
errorSum = perErr;
gcSum = xactual;
}
}
double errorAvg = 0;
double gcAvg = 0;
if (errorCount > 0)
{
errorAvg = errorSum/errorCount;
gcAvg = gcSum/errorCount;
}
System.out.println("Time of actual GC " + xactual + " predicted time = " + xpredicted + " diff = " + diff + " Perc Error " + perErr + " avgPerErr = " + errorAvg + " gcAvg = " + gcAvg + " counted " + errorCount + " out of " + totalCount);
}
public static void main(String[] args) {
long t1 = 0; // time each CQL operation, t1 is start time t2 is end time, time is t2-t1
long t2 = 0;
long time = 0;
t1 = System.currentTimeMillis();
/* try different consistency levels
* ONE, TWO, THREE, QUORUM, LOCAL_QUORUM, ALL
*/
QueryOptions qos = new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM);
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoints(
n1PubIP // provide one public IP
)
.withQueryOptions(qos)
// can't use load balancing with only a single public IP address (with private IP broadcasting turned on for the cluster)
//.withLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(dcName).build()) // your local data centre
.withPort(9042);
// .withAuthProvider(new PlainTextAuthProvider(user, password));
Cluster cluster = null;
try {
cluster = clusterBuilder.build();
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
for (Host host: metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
}
Session session = cluster.connect();
ResultSet rs;
int useHosts = 10; // max hosts to sample
int usedHosts = 0;
ResultSet rsHostNames = session.execute("select host from instametrics.host");
for (Row hostN : rsHostNames)
{ // next host
usedHosts++;
if (usedHosts > useHosts)
break;
System.out.println("PROCESSING HOST NUMBER " + usedHosts + " " + hostN.toString());
String hostName = hostN.getString("host");
rs = session.execute("select * from instametrics.host_service_time2 where host='" + hostName + "' and service='/cassandra/jvm/gc/ConcurrentMarkSweep/collectionCount'");
ArrayList<Sample> samples = findNextGCCount(rs);
for (Sample sample: samples)
{
Instant i1 = sample.start;
Instant i2 = sample.end;
long l1 = i1.toEpochMilli();
long l2 = i2.toEpochMilli();
ResultSet rs2 = session.execute("select time, metric from instametrics.host_service_time2 where host='" + hostName + "' and service='/cassandra/jvm/memory/heapMemoryUsage/used' and time>" + l1 + " and time<" + l2);
System.out.println("heap mem samples = " + rs2.getAvailableWithoutFetching() + " from " + i1 + " to " + i2);
printInstantInterval(i1, i2);
// what's min sample size to use? 10?
if (rs2.getAvailableWithoutFetching() >= 10)
{
relativeTime(rs2);
regression(timesArray, valuesArray);
}
}
}
t2 = System.currentTimeMillis();
time = t2 - t1;
System.out.println("DONE! Total time = " + time);
}
finally {
if (cluster != null) cluster.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment