Last active
October 9, 2015 13:51
-
-
Save pavolloffay/fa0415e5eee485ba3970 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Pair { | |
public String first; | |
public JavaDStream<MetricData> second; | |
public Pair(String first, JavaDStream<MetricData> second) { | |
this.first = first; | |
this.second = second; | |
} | |
} | |
@Override | |
public void run() { | |
Map<String, StreamingLinearRegressionWithSGD> modelInsideMethod = new HashMap<>(); | |
JavaDStream<MetricData> metricDataDStream = streamingContext.receiverStream(metricDataReceiver); | |
Set<String> scheduleIds = new HashSet<>(); | |
metricDataDStream.map(metricData -> { | |
scheduleIds.add(metricData.getId()); | |
return 1; | |
}); | |
Map<String, JavaDStream<MetricData>> toLearnList = scheduleIds.stream().map(scheduleId -> new Pair | |
(scheduleId, | |
metricDataDStream.filter(metricData -> metricData.getId().equals(scheduleId))) | |
).collect(Collectors.toMap((x) -> x.first, (x) -> x.second)); | |
toLearnList.entrySet().forEach(entry -> { | |
JavaDStream<LabeledPoint> toLearnForSure = entry.getValue().map(metricData -> | |
new LabeledPoint(metricData.getValue(), Vectors.dense(metricData.getTimestamp())) | |
); | |
StreamingLinearRegressionWithSGD streamingLinearRegressionWithSGD = | |
new StreamingLinearRegressionWithSGD(2, 4, 3); | |
streamingLinearRegressionWithSGD.trainOn(toLearnForSure); | |
modelInsideMethod.put(entry.getKey(), streamingLinearRegressionWithSGD); | |
toLearnForSure.foreach(x -> { | |
EngineLogger.LOGGER.debugf("RESULTXXS %s", modelInsideMethod.toString()); | |
return null; | |
}); | |
EngineLogger.LOGGER.debugf("Model for metric %s", entry.getKey()); | |
}); | |
JavaPairDStream<String, MetricData> stringMetricDataJavaPairDStream = | |
metricDataDStream.mapToPair((metricData) -> new Tuple2<String, MetricData>(metricData.getId(), | |
metricData)); | |
stringMetricDataJavaPairDStream.transform(x -> { | |
return null; | |
}); | |
JavaPairDStream<String, Iterable<MetricData>> stringIterableJavaPairDStream = | |
stringMetricDataJavaPairDStream.groupByKey(); | |
JavaDStream<LabeledPoint> toLearn = stringIterableJavaPairDStream.map(v1 -> { | |
return new LabeledPoint(1.0, Vectors.dense(2)); | |
}); | |
JavaDStream<PredictionRequest> predictionRequestDStream = | |
streamingContext.receiverStream(predictionRequestReceiver); | |
predictionRequestDStream.foreachRDD( | |
new Function<JavaRDD<PredictionRequest>, Void>() { | |
@Override | |
public Void call(JavaRDD<PredictionRequest> rdd) { | |
System.out.format("Data from REST %s", rdd.toString()); | |
rdd.foreach(c -> System.out.print(c.toString())); | |
return null; | |
} | |
}); | |
metricDataDStream.cache(); | |
JavaDStream<MetricData> a = metricDataDStream.filter(x -> { | |
return false; | |
}); | |
List<String> metricIds = new ArrayList<>(); | |
/** | |
* | |
*/ | |
metricDataDStream.map(metricData -> { | |
//this works | |
StreamingLinearRegressionWithSGD model = modelInsideMethod.get((metricData.getId())); | |
if (model == null) { | |
// model = getNewModel(); | |
model = new StreamingLinearRegressionWithSGD(STEP_SIZE, ITERATIONS, 1); | |
modelInsideMethod.put(metricData.getId(), model); | |
} | |
metricIds.add(metricData.getId()); | |
return new LabeledPoint(22.2, Vectors.zeros(2)); | |
}); | |
metricDataDStream.transform(new Function<JavaRDD<MetricData>, JavaRDD<MetricData>>() { | |
@Override | |
public JavaRDD<MetricData> call(JavaRDD<MetricData> v1) throws Exception { | |
return null; | |
} | |
}); | |
// transform to Labeled point | |
JavaDStream<LabeledPoint> parsedData = metricDataDStream.map(metricData -> { | |
return new LabeledPoint(metricData.getValue(), Vectors.dense(metricData.getTimestamp())); | |
}); | |
parsedData.cache(); | |
// train model | |
StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD() | |
.setInitialWeights(Vectors.zeros(1)); | |
model.algorithm().optimizer().setNumIterations(ITERATIONS); | |
model.algorithm().optimizer().setStepSize(STEP_SIZE); | |
// model.algorithm().optimizer().setMiniBatchFraction(1); | |
model.algorithm().optimizer().setUpdater(new ANNUpdater()); | |
model.trainOn(parsedData); | |
// predict | |
JavaDStream<Vector> predictOn = predictionRequestDStream.map(x -> { | |
// must have same size as we have defined feature size | |
double[] features = new double[]{x.getFeatures().get(0)}; | |
return Vectors.dense(features); | |
}); | |
metricDataDStream.tra | |
predictOn.print(); | |
JavaDStream<Double> predictionResult = model.predictOn(predictOn); | |
predictionResult.foreach(x -> { | |
//push result to JMS queue | |
System.out.println("saas"); | |
return null; | |
}); | |
model.predictOn(parsedData.map(x -> Vectors.dense(x.features().toArray()))).print(); | |
model.trainOn(); | |
streamingContext.start(); | |
streamingContext.awaitTermination(); | |
EngineLogger.LOGGER.debug("\n\n\n\n\nStreaming job stopped\n\n\n\n\n"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment