Skip to content

Instantly share code, notes, and snippets.

@pavolloffay pavolloffay/spark
Last active Oct 9, 2015

Embed
What would you like to do?
public class Pair {
public String first;
public JavaDStream<MetricData> second;
public Pair(String first, JavaDStream<MetricData> second) {
this.first = first;
this.second = second;
}
}
@Override
public void run() {
Map<String, StreamingLinearRegressionWithSGD> modelInsideMethod = new HashMap<>();
JavaDStream<MetricData> metricDataDStream = streamingContext.receiverStream(metricDataReceiver);
Set<String> scheduleIds = new HashSet<>();
metricDataDStream.map(metricData -> {
scheduleIds.add(metricData.getId());
return 1;
});
Map<String, JavaDStream<MetricData>> toLearnList = scheduleIds.stream().map(scheduleId -> new Pair
(scheduleId,
metricDataDStream.filter(metricData -> metricData.getId().equals(scheduleId)))
).collect(Collectors.toMap((x) -> x.first, (x) -> x.second));
toLearnList.entrySet().forEach(entry -> {
JavaDStream<LabeledPoint> toLearnForSure = entry.getValue().map(metricData ->
new LabeledPoint(metricData.getValue(), Vectors.dense(metricData.getTimestamp()))
);
StreamingLinearRegressionWithSGD streamingLinearRegressionWithSGD =
new StreamingLinearRegressionWithSGD(2, 4, 3);
streamingLinearRegressionWithSGD.trainOn(toLearnForSure);
modelInsideMethod.put(entry.getKey(), streamingLinearRegressionWithSGD);
toLearnForSure.foreach(x -> {
EngineLogger.LOGGER.debugf("RESULTXXS %s", modelInsideMethod.toString());
return null;
});
EngineLogger.LOGGER.debugf("Model for metric %s", entry.getKey());
});
JavaPairDStream<String, MetricData> stringMetricDataJavaPairDStream =
metricDataDStream.mapToPair((metricData) -> new Tuple2<String, MetricData>(metricData.getId(),
metricData));
stringMetricDataJavaPairDStream.transform(x -> {
return null;
});
JavaPairDStream<String, Iterable<MetricData>> stringIterableJavaPairDStream =
stringMetricDataJavaPairDStream.groupByKey();
JavaDStream<LabeledPoint> toLearn = stringIterableJavaPairDStream.map(v1 -> {
return new LabeledPoint(1.0, Vectors.dense(2));
});
JavaDStream<PredictionRequest> predictionRequestDStream =
streamingContext.receiverStream(predictionRequestReceiver);
predictionRequestDStream.foreachRDD(
new Function<JavaRDD<PredictionRequest>, Void>() {
@Override
public Void call(JavaRDD<PredictionRequest> rdd) {
System.out.format("Data from REST %s", rdd.toString());
rdd.foreach(c -> System.out.print(c.toString()));
return null;
}
});
metricDataDStream.cache();
JavaDStream<MetricData> a = metricDataDStream.filter(x -> {
return false;
});
List<String> metricIds = new ArrayList<>();
/**
*
*/
metricDataDStream.map(metricData -> {
//this works
StreamingLinearRegressionWithSGD model = modelInsideMethod.get((metricData.getId()));
if (model == null) {
// model = getNewModel();
model = new StreamingLinearRegressionWithSGD(STEP_SIZE, ITERATIONS, 1);
modelInsideMethod.put(metricData.getId(), model);
}
metricIds.add(metricData.getId());
return new LabeledPoint(22.2, Vectors.zeros(2));
});
metricDataDStream.transform(new Function<JavaRDD<MetricData>, JavaRDD<MetricData>>() {
@Override
public JavaRDD<MetricData> call(JavaRDD<MetricData> v1) throws Exception {
return null;
}
});
// transform to Labeled point
JavaDStream<LabeledPoint> parsedData = metricDataDStream.map(metricData -> {
return new LabeledPoint(metricData.getValue(), Vectors.dense(metricData.getTimestamp()));
});
parsedData.cache();
// train model
StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(1));
model.algorithm().optimizer().setNumIterations(ITERATIONS);
model.algorithm().optimizer().setStepSize(STEP_SIZE);
// model.algorithm().optimizer().setMiniBatchFraction(1);
model.algorithm().optimizer().setUpdater(new ANNUpdater());
model.trainOn(parsedData);
// predict
JavaDStream<Vector> predictOn = predictionRequestDStream.map(x -> {
// must have same size as we have defined feature size
double[] features = new double[]{x.getFeatures().get(0)};
return Vectors.dense(features);
});
metricDataDStream.tra
predictOn.print();
JavaDStream<Double> predictionResult = model.predictOn(predictOn);
predictionResult.foreach(x -> {
//push result to JMS queue
System.out.println("saas");
return null;
});
model.predictOn(parsedData.map(x -> Vectors.dense(x.features().toArray()))).print();
model.trainOn();
streamingContext.start();
streamingContext.awaitTermination();
EngineLogger.LOGGER.debug("\n\n\n\n\nStreaming job stopped\n\n\n\n\n");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.