This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary; | |
import org.apache.spark.mllib.stat.Statistics; | |
private MultivariateStatisticalSummary summary; | |
public ExtractFeature(JavaRDD<Vector> data) { | |
this.summary = Statistics.colStats(data.rdd()); | |
} | |
// return (mean_acc_x, mean_acc_y, mean_acc_z) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// first find jumps to define the continuous periods of data | |
Long firstElement = times.first(); | |
Long lastElement = times.sortBy(time -> time, false, 1).first(); | |
JavaRDD<Long> firstRDD = timestamps.filter(record -> record > firstElement); | |
JavaRDD<Long> secondRDD = timestamps.filter(record -> record < lastElement); | |
// compute the difference between each timestamp | |
// and then if the difference is greater than 100 000 000, it must be different periods of recording, so we have a jump | |
// I have chosen 100 000 000 (the values are recording every 50 000 000 and around 0.05% are spaced by more than 100 000 000). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// create bucket of sorted data by ascending timestamp by (user, activity) | |
JavaRDD<Long> times = cassandraRowsRDD.select("timestamp") | |
.where("user_id=? AND activity=?", i, activity) | |
.withAscOrder() | |
.map(CassandraRow::toMap) | |
.map(entry -> (long) entry.get("timestamp")) | |
.cache(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// define Spark context | |
SparkConf sparkConf = new SparkConf() | |
.setAppName("User's physical activity recognition") | |
.set("spark.cassandra.connection.host", "127.0.0.1") | |
.setMaster("local[*]"); | |
JavaSparkContext sc = new JavaSparkContext(sparkConf); | |
// retrieve data from Cassandra and create an CassandraRDD | |
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = javaFunctions(sc).cassandraTable("actitracker", "users"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE KEYSPACE actitracker WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; | |
CREATE TABLE users (user_id int,activity text,timestamp bigint,acc_x double,acc_y double,acc_z double, PRIMARY KEY ((user_id,activity),timestamp)); | |
COPY users FROM '/path_to_your_data/data.csv' WITH HEADER = true; | |
SELECT * FROM users WHERE user_id = 8 AND activity = 'Standing' LIMIT 10; |
NewerOlder