This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE KEYSPACE actitracker WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; | |
CREATE TABLE users (user_id int,activity text,timestamp bigint,acc_x double,acc_y double,acc_z double, PRIMARY KEY ((user_id,activity),timestamp)); | |
COPY users FROM '/path_to_your_data/data.csv' WITH HEADER = true; | |
SELECT * FROM users WHERE user_id = 8 AND activity = 'Standing' LIMIT 10; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// define Spark context | |
SparkConf sparkConf = new SparkConf() | |
.setAppName("User's physical activity recognition") | |
.set("spark.cassandra.connection.host", "127.0.0.1") | |
.setMaster("local[*]"); | |
JavaSparkContext sc = new JavaSparkContext(sparkConf); | |
// retrieve data from Cassandra and create an CassandraRDD | |
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = javaFunctions(sc).cassandraTable("actitracker", "users"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// create bucket of sorted data by ascending timestamp by (user, activity) | |
JavaRDD<Long> times = cassandraRowsRDD.select("timestamp") | |
.where("user_id=? AND activity=?", i, activity) | |
.withAscOrder() | |
.map(CassandraRow::toMap) | |
.map(entry -> (long) entry.get("timestamp")) | |
.cache(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// first find jumps to define the continuous periods of data | |
Long firstElement = times.first(); | |
Long lastElement = times.sortBy(time -> time, false, 1).first(); | |
JavaRDD<Long> firstRDD = timestamps.filter(record -> record > firstElement); | |
JavaRDD<Long> secondRDD = timestamps.filter(record -> record < lastElement); | |
// compute the difference between each timestamp | |
// and then if the difference is greater than 100 000 000, it must be different periods of recording, so we have a jump | |
// I have chosen 100 000 000 (the values are recording every 50 000 000 and around 0.05% are spaced by more than 100 000 000). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary; | |
import org.apache.spark.mllib.stat.Statistics; | |
private MultivariateStatisticalSummary summary; | |
public ExtractFeature(JavaRDD<Vector> data) { | |
this.summary = Statistics.colStats(data.rdd()); | |
} | |
// return (mean_acc_x, mean_acc_y, mean_acc_z) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @return Vector [ (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} ] | |
*/ | |
public static double[] computeAvgAbsDifference(JavaRDD<double[]> data, double[] mean) { | |
// then for each point x compute x - mean | |
// then apply an absolute value: |x - mean| | |
JavaRDD<Vector> abs = data.map(record -> new double[]{Math.abs(record[0] - mean[0]), | |
Math.abs(record[1] - mean[1]), | |
Math.abs(record[2] - mean[2])}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @return Double resultant = 1/n * ∑ √(x² + y² + z²) | |
*/ | |
public static double computeResultantAcc(JavaRDD<double[]> data) { | |
// first let's compute the square of each value and the sum | |
// compute then the root square: √(x² + y² + z²) | |
// to finish apply a mean function: 1/n * sum [√(x² + y² + z²)] | |
JavaRDD<Vector> squared = data.map(record -> Math.pow(record[0], 2) | |
+ Math.pow(record[1], 2) | |
+ Math.pow(record[2], 2)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Double computeAvgTimeBetweenPeak(JavaRDD<long[]> data) { | |
// define the maximum using the max function from MLlib | |
double[] max = this.summary.max().toArray(); | |
// keep the timestamp of data point for which the value is greater than 0.9 * max | |
// and sort it ! | |
JavaRDD<Long> filtered_y = data.filter(record -> record[1] > 0.9 * max[1]) | |
.map(record -> record[0]) | |
.sortBy(time -> time, true, 1); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Split data into 2 sets : training (60%) and test (40%). | |
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.6, 0.4}); | |
JavaRDD<LabeledPoint> trainingData = splits[0].cache(); | |
JavaRDD<LabeledPoint> testData = splits[1]; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); | |
int numClasses = 4; | |
String impurity = "gini"; | |
int maxDepth = 9; | |
int maxBins = 32; | |
// create model | |
final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); | |
// Evaluate model on training instances and compute training error |
OlderNewer