|
import java.io.{ File => JFile } |
|
import java.time._ |
|
import java.time.format.DateTimeFormatter |
|
import gremlin.scala._ |
|
import System._ |
|
import org.apache.tinkerpop.gremlin.tinkergraph.structure.{TinkerFactory, TinkerGraph} |
|
import org.apache.tinkerpop.gremlin.process.traversal.P |
|
import com.github.tototoshi.csv._ |
|
import scala.util.Try |
|
// The graph |
|
val ø = TinkerGraph.open.asScala |
|
// Labels |
|
val Frame = ":Frame" |
|
val RunFirstFrame = ":RUN_FIRST_FRAME" |
|
val RunFrame = ":RUN_FRAME" |
|
val NextFrame = ":NEXT_FRAME" |
|
// Properties |
|
val Timestamp = Key[Long]("timestamp") |
|
// Initial nodes |
|
val Run = ø + (":Run") |
|
// Endless stream of frames: start -> oo |
|
def frames(x0: Option[Vertex], x1: Option[Vertex]): Stream[(Option[Vertex], Option[Vertex])] = |
|
(x0, x1) #:: frames(x1, x1.flatMap(_.out(NextFrame).headOption)) |
|
|
|
// no magic here. just representing a linked list as a graph and inserting with the beautiful language |
|
// of gremlin diarrhea |
|
def insertFrame(timestamp: Long)(frames: (Option[Vertex], Option[Vertex])) = frames match { |
|
case (None, None) => { // insert into empty list |
|
val first = ø + (Frame, Timestamp -> timestamp) |
|
Run --- RunFirstFrame --> first |
|
Some(first) |
|
} |
|
case (None, Some(oldFirst)) if oldFirst.value2(Timestamp) > timestamp => { // insert as head |
|
Run.outE(RunFirstFrame).drop() |
|
val newFirstFrame = ø + (Frame, Timestamp -> timestamp) |
|
Run --- RunFrame --> newFirstFrame |
|
Run --- RunFirstFrame --> newFirstFrame |
|
newFirstFrame --- NextFrame --> oldFirst |
|
Some(newFirstFrame) |
|
} |
|
case (_, Some(same)) if same.value2(Timestamp) == timestamp => Some(same) |
|
case (Some(prev), Some(curr)) if curr.value2(Timestamp) > timestamp => { // insert into inner pos |
|
prev.outE(NextFrame).drop() |
|
val newFrame = ø + (Frame, Timestamp -> timestamp) |
|
Run --- RunFrame --> newFrame |
|
newFrame --- NextFrame --> curr |
|
prev --- NextFrame --> newFrame |
|
Some(newFrame) |
|
} |
|
case (Some(prev), None) => { // insert as last |
|
val newFrame = ø + (Frame, Timestamp -> timestamp) |
|
Run --- RunFrame --> newFrame |
|
prev --- NextFrame --> newFrame |
|
Some(newFrame) |
|
} |
|
case (_, Some(curr)) if curr.value2(Timestamp) < timestamp => None // do not insert |
|
} |
|
// import csv |
|
val TS_INDEX = 13 |
|
|
|
val reader = CSVReader.open(new JFile("/Users/david/Projects/thesis-project/weather_data.csv")) |
|
|
|
def format(t: String) = LocalDateTime.parse(t, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) |
|
// general method takes 6437 ms. Should work even if the rows are not ordered by ascending timestamp |
|
//def insert(it: Iterator[Seq[String]]) = it.map(row => { |
|
// val timestamp = format.parse(row(TS_INDEX)).getTime |
|
// frames(None, Run.out(RunFirstFrame).headOption).map(insertFrame(timestamp)).dropWhile(_.isEmpty).head.get |
|
// }).toList |
|
|
|
// yeah but we know that the data is ordered by timestamp |
|
// so we will always have tail inserts. Consequently we don't have to traverse the linked list from the start, just insert |
|
// as we go. |
|
// only 81 ms. Huge speedup! |
|
def insertFramesIncreasing(it: Iterator[Seq[String]]) = it.foldLeft(List[(Vertex, Seq[String])]())((acc, row) => { |
|
// and the winner of code bloat competition is Java 8 Date API. <3 |
|
val timestamp = format(row(TS_INDEX)).toInstant(ZoneOffset.ofTotalSeconds(0)).getEpochSecond |
|
val stream = acc match { |
|
case Nil => frames(None, Run.out(RunFirstFrame).headOption) |
|
case (frame, _) :: _ => frames(None, Some(frame)) |
|
} |
|
val inserted = stream.map(insertFrame(timestamp)).dropWhile(_.isEmpty).head.get |
|
(inserted, row) :: acc |
|
}).reverse |
|
val start = currentTimeMillis() |
|
val framesData = insertFramesIncreasing(reader.iterator) |
|
val diff = currentTimeMillis() - start |
|
val Location = ":Location" |
|
val LocationValue = Key[String] ("location") |
|
val Budapest = ø + (Location, LocationValue -> "Budapest") |
|
val Temperature = ":FRAME_TEMPERATURE_LOC" |
|
val Humidity = ":FRAME_HUMIDITY_LOC" |
|
def Value[A] = Key[A]("value") |
|
val TEMP_INDEX = 1 |
|
val HUMIDITY_INDEX = 3 |
|
framesData.map { |
|
case(vertex, data) => { |
|
Try { data(TEMP_INDEX).toFloat } map { x => vertex --- (Temperature, Value[Float] -> x) --> Budapest } |
|
Try { data(HUMIDITY_INDEX).toFloat } map { x => vertex --- (Humidity, Value[Float] -> x) --> Budapest } |
|
} |
|
} |
|
|
|
def measured(c: () => Unit) { |
|
val start = currentTimeMillis() |
|
c() |
|
println(s"time: ${(currentTimeMillis() - start)}") |
|
} |
|
// Start asking... |
|
// Days when temperature was above 10 celsius in Budapest (at some time) |
|
measured { () => |
|
ø |
|
.V.hasLabel(Location).has(LocationValue, P.eq("Budapest")) |
|
.inE(Temperature).has(Value[Float], P.gte(10.0f)) |
|
.outV.value(Timestamp).toList.map(ts => LocalDateTime.ofEpochSecond(ts, 0, ZoneOffset.ofTotalSeconds(0)).toLocalDate) |
|
.toSet |
|
.foreach(println) |
|
} |
|
def mean[A: Numeric](values: Iterable[A]) = { |
|
val n = implicitly[Numeric[A]] |
|
values.foldLeft((n.zero, 0))((b, a) => { |
|
b match { |
|
case (sum, count) => (n.plus(sum, a), count + 1) |
|
} |
|
}) match { |
|
case (0, 0) => n.toFloat(n.zero) |
|
case (sum, count) => n.toFloat(sum) / count |
|
} |
|
} |
|
val ts = 0l |
|
def extractDate(ts: Long) = LocalDateTime.ofInstant( |
|
Instant.ofEpochSecond(ts), |
|
ZoneId.ofOffset("UTC", ZoneOffset.ofTotalSeconds(0))).toLocalDate |
|
|
|
// and the winner fow worst api design is Java 8 Date API. this throws a runtime error! (OK previous API was worse) |
|
def extractDateBad(ts: Long) = |
|
LocalDate.from(Instant.ofEpochSecond(ts)) |
|
// Monthly avg temp |
|
measured { () => |
|
Run.out(RunFrame) |
|
.local( |
|
_.as("k") |
|
.outE(Temperature).value(Value[Float]).as("v") |
|
.select("k", "v") |
|
.groupBy( |
|
_.get("k").asInstanceOf[Vertex], // BAD cuz fuck type safety |
|
_.get("v").asInstanceOf[Float] // BAD same reason |
|
) |
|
.map( |
|
_.map{ case (v, l) => (extractDate(v.value2(Timestamp)), mean(l))} |
|
) |
|
) |
|
.groupBy( |
|
_.keys.head, // BAD type should enforce unary value |
|
_.values.head // BAD same reason |
|
) |
|
.map( |
|
_.mapValues(x => mean(x)) // wtf scala? why doesn't _.mapValues(mean) work |
|
) |
|
.toList.head // BAD |
|
.foreach(println) |
|
} |
|
|