Skip to content

Instantly share code, notes, and snippets.

@dszakallas
Last active April 13, 2016 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dszakallas/2666119bc142e2c914c075a733e0cc0e to your computer and use it in GitHub Desktop.
Save dszakallas/2666119bc142e2c914c075a733e0cc0e to your computer and use it in GitHub Desktop.
Dynamic graph for weather data

Temperature and humidity observations in Budapest represented as a dynamic graph. Time is represented as a series of frames (points of time).

Improvements:

  • support custom locations
  • hierchical indexing based on time (now it's a flat linked list)
  • gremlinize. Now it's far from optimal with lots of scala collection code interwoven. not good
import java.io.{ File => JFile }
import java.time._
import java.time.format.DateTimeFormatter
import gremlin.scala._
import System._
import org.apache.tinkerpop.gremlin.tinkergraph.structure.{TinkerFactory, TinkerGraph}
import org.apache.tinkerpop.gremlin.process.traversal.P
import com.github.tototoshi.csv._
import scala.util.Try
// The graph
val ø = TinkerGraph.open.asScala
// Labels
val Frame = ":Frame"
val RunFirstFrame = ":RUN_FIRST_FRAME"
val RunFrame = ":RUN_FRAME"
val NextFrame = ":NEXT_FRAME"
// Properties
val Timestamp = Key[Long]("timestamp")
// Initial nodes
val Run = ø + (":Run")
// Endless stream of frames: start -> oo
def frames(x0: Option[Vertex], x1: Option[Vertex]): Stream[(Option[Vertex], Option[Vertex])] =
(x0, x1) #:: frames(x1, x1.flatMap(_.out(NextFrame).headOption))
// no magic here. just representing a linked list as a graph and inserting with the beautiful language
// of gremlin diarrhea
def insertFrame(timestamp: Long)(frames: (Option[Vertex], Option[Vertex])) = frames match {
case (None, None) => { // insert into empty list
val first = ø + (Frame, Timestamp -> timestamp)
Run --- RunFirstFrame --> first
Some(first)
}
case (None, Some(oldFirst)) if oldFirst.value2(Timestamp) > timestamp => { // insert as head
Run.outE(RunFirstFrame).drop()
val newFirstFrame = ø + (Frame, Timestamp -> timestamp)
Run --- RunFrame --> newFirstFrame
Run --- RunFirstFrame --> newFirstFrame
newFirstFrame --- NextFrame --> oldFirst
Some(newFirstFrame)
}
case (_, Some(same)) if same.value2(Timestamp) == timestamp => Some(same)
case (Some(prev), Some(curr)) if curr.value2(Timestamp) > timestamp => { // insert into inner pos
prev.outE(NextFrame).drop()
val newFrame = ø + (Frame, Timestamp -> timestamp)
Run --- RunFrame --> newFrame
newFrame --- NextFrame --> curr
prev --- NextFrame --> newFrame
Some(newFrame)
}
case (Some(prev), None) => { // insert as last
val newFrame = ø + (Frame, Timestamp -> timestamp)
Run --- RunFrame --> newFrame
prev --- NextFrame --> newFrame
Some(newFrame)
}
case (_, Some(curr)) if curr.value2(Timestamp) < timestamp => None // do not insert
}
// import csv
val TS_INDEX = 13
val reader = CSVReader.open(new JFile("/Users/david/Projects/thesis-project/weather_data.csv"))
def format(t: String) = LocalDateTime.parse(t, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
// general method takes 6437 ms. Should work even if the rows are not ordered by ascending timestamp
//def insert(it: Iterator[Seq[String]]) = it.map(row => {
// val timestamp = format.parse(row(TS_INDEX)).getTime
// frames(None, Run.out(RunFirstFrame).headOption).map(insertFrame(timestamp)).dropWhile(_.isEmpty).head.get
// }).toList
// yeah but we know that the data is ordered by timestamp
// so we will always have tail inserts. Consequently we don't have to traverse the linked list from the start, just insert
// as we go.
// only 81 ms. Huge speedup!
def insertFramesIncreasing(it: Iterator[Seq[String]]) = it.foldLeft(List[(Vertex, Seq[String])]())((acc, row) => {
// and the winner of code bloat competition is Java 8 Date API. <3
val timestamp = format(row(TS_INDEX)).toInstant(ZoneOffset.ofTotalSeconds(0)).getEpochSecond
val stream = acc match {
case Nil => frames(None, Run.out(RunFirstFrame).headOption)
case (frame, _) :: _ => frames(None, Some(frame))
}
val inserted = stream.map(insertFrame(timestamp)).dropWhile(_.isEmpty).head.get
(inserted, row) :: acc
}).reverse
val start = currentTimeMillis()
val framesData = insertFramesIncreasing(reader.iterator)
val diff = currentTimeMillis() - start
val Location = ":Location"
val LocationValue = Key[String] ("location")
val Budapest = ø + (Location, LocationValue -> "Budapest")
val Temperature = ":FRAME_TEMPERATURE_LOC"
val Humidity = ":FRAME_HUMIDITY_LOC"
def Value[A] = Key[A]("value")
val TEMP_INDEX = 1
val HUMIDITY_INDEX = 3
framesData.map {
case(vertex, data) => {
Try { data(TEMP_INDEX).toFloat } map { x => vertex --- (Temperature, Value[Float] -> x) --> Budapest }
Try { data(HUMIDITY_INDEX).toFloat } map { x => vertex --- (Humidity, Value[Float] -> x) --> Budapest }
}
}
def measured(c: () => Unit) {
val start = currentTimeMillis()
c()
println(s"time: ${(currentTimeMillis() - start)}")
}
// Start asking...
// Days when temperature was above 10 celsius in Budapest (at some time)
measured { () =>
ø
.V.hasLabel(Location).has(LocationValue, P.eq("Budapest"))
.inE(Temperature).has(Value[Float], P.gte(10.0f))
.outV.value(Timestamp).toList.map(ts => LocalDateTime.ofEpochSecond(ts, 0, ZoneOffset.ofTotalSeconds(0)).toLocalDate)
.toSet
.foreach(println)
}
def mean[A: Numeric](values: Iterable[A]) = {
val n = implicitly[Numeric[A]]
values.foldLeft((n.zero, 0))((b, a) => {
b match {
case (sum, count) => (n.plus(sum, a), count + 1)
}
}) match {
case (0, 0) => n.toFloat(n.zero)
case (sum, count) => n.toFloat(sum) / count
}
}
val ts = 0l
def extractDate(ts: Long) = LocalDateTime.ofInstant(
Instant.ofEpochSecond(ts),
ZoneId.ofOffset("UTC", ZoneOffset.ofTotalSeconds(0))).toLocalDate
// and the winner fow worst api design is Java 8 Date API. this throws a runtime error! (OK previous API was worse)
def extractDateBad(ts: Long) =
LocalDate.from(Instant.ofEpochSecond(ts))
// Monthly avg temp
measured { () =>
Run.out(RunFrame)
.local(
_.as("k")
.outE(Temperature).value(Value[Float]).as("v")
.select("k", "v")
.groupBy(
_.get("k").asInstanceOf[Vertex], // BAD cuz fuck type safety
_.get("v").asInstanceOf[Float] // BAD same reason
)
.map(
_.map{ case (v, l) => (extractDate(v.value2(Timestamp)), mean(l))}
)
)
.groupBy(
_.keys.head, // BAD type should enforce unary value
_.values.head // BAD same reason
)
.map(
_.mapValues(x => mean(x)) // wtf scala? why doesn't _.mapValues(mean) work
)
.toList.head // BAD
.foreach(println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment